Skip to main content

rustack_s3_core/ops/
object.rs

1//! Object CRUD operation handlers.
2//!
3//! Implements `put_object`, `get_object`, `head_object`, `delete_object`,
4//! `delete_objects`, and `copy_object`.
5
6use std::{collections::HashMap, str::FromStr};
7
8use bytes::Bytes;
9use chrono::Utc;
10use rustack_s3_model::{
11    error::{S3Error, S3ErrorCode},
12    input::{
13        CopyObjectInput, DeleteObjectInput, DeleteObjectsInput, GetObjectInput, HeadObjectInput,
14        PutObjectInput,
15    },
16    output::{
17        CopyObjectOutput, DeleteObjectOutput, DeleteObjectsOutput, GetObjectOutput,
18        HeadObjectOutput, PutObjectOutput,
19    },
20    request::StreamingBlob,
21    types::{
22        ChecksumType, CopyObjectResult, DeletedObject, MetadataDirective, ObjectCannedACL,
23        ObjectLockLegalHoldStatus, ObjectLockMode, ServerSideEncryption, StorageClass,
24    },
25};
26use tracing::debug;
27
28use crate::{
29    checksums::{ChecksumAlgorithm, compute_checksum},
30    error::S3ServiceError,
31    provider::RustackS3,
32    state::{
33        keystore::ObjectStore,
34        object::{CannedAcl, ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
35    },
36    utils::{is_valid_if_match, is_valid_if_none_match, parse_copy_source, parse_range_header},
37    validation::{validate_content_md5, validate_metadata, validate_object_key},
38};
39
40/// Check whether Object Lock (legal hold or retention) prevents deletion of a
41/// specific object version.
42///
43/// # Errors
44///
45/// Returns `AccessDenied` if the version has a legal hold enabled or an active
46/// retention period that cannot be bypassed.
47///
48/// AWS S3 rules:
49/// - DELETE *without* a version ID always succeeds (creates a delete marker).
50/// - DELETE *with* a version ID must be rejected if the version has a legal hold enabled or a
51///   retention period that has not yet expired.
52/// - `BypassGovernanceRetention` allows skipping GOVERNANCE-mode retention checks, but never
53///   COMPLIANCE-mode or legal holds.
54///
55/// Returns `Ok(())` when the deletion is allowed.
56#[allow(clippy::result_large_err)]
57fn check_object_lock_for_delete(
58    store: &ObjectStore,
59    key: &str,
60    version_id: &str,
61    bypass_governance: bool,
62) -> Result<(), S3Error> {
63    let Some(obj) = store.get_version(key, version_id) else {
64        // Version not found — nothing to protect.
65        return Ok(());
66    };
67
68    if obj.metadata.object_lock_legal_hold == Some(true) {
69        return Err(S3Error::with_message(
70            S3ErrorCode::AccessDenied,
71            "Object Lock legal hold is enabled on this object",
72        ));
73    }
74
75    if let Some(retain_until) = obj.metadata.object_lock_retain_until {
76        if retain_until > Utc::now() {
77            let is_governance = obj.metadata.object_lock_mode.as_deref() == Some("GOVERNANCE");
78            if is_governance && bypass_governance {
79                // GOVERNANCE mode retention can be bypassed.
80            } else {
81                return Err(S3Error::with_message(
82                    S3ErrorCode::AccessDenied,
83                    "Object Lock retention period has not expired",
84                ));
85            }
86        }
87    }
88
89    Ok(())
90}
91
92// AWS S3 DTOs use signed integers (i32/i64) for inherently non-negative values
93// (sizes, part counts). Casting from u64/u32/usize is safe in practice.
94// These handler methods must remain async because some operations involve
95// storage I/O.
96#[allow(
97    clippy::cast_possible_wrap,
98    clippy::cast_possible_truncation,
99    clippy::cast_sign_loss,
100    clippy::unused_async
101)]
102impl RustackS3 {
103    /// Put (upload) a new object.
104    pub async fn handle_put_object(
105        &self,
106        mut input: PutObjectInput,
107    ) -> Result<PutObjectOutput, S3Error> {
108        let bucket_name = input.bucket.clone();
109        let key = input.key.clone();
110
111        validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
112
113        // Verify bucket exists.
114        let bucket = self
115            .state
116            .get_bucket(&bucket_name)
117            .map_err(S3ServiceError::into_s3_error)?;
118
119        // Take the body out before borrowing other fields from input.
120        let body_data = input.body.take().map_or_else(Bytes::new, |b| b.data);
121
122        // Validate Content-MD5 if provided.
123        validate_content_md5(input.content_md5.as_deref(), &body_data)
124            .map_err(S3ServiceError::into_s3_error)?;
125
126        // Extract metadata from the request.
127        let metadata = build_metadata(&input);
128        validate_metadata(&metadata.user_metadata).map_err(S3ServiceError::into_s3_error)?;
129
130        // Determine version ID based on versioning status.
131        let version_id = if bucket.is_versioning_enabled() {
132            crate::utils::generate_version_id()
133        } else {
134            "null".to_owned()
135        };
136
137        // Write to storage.
138        let write_result = self
139            .storage
140            .write_object(&bucket_name, &key, &version_id, body_data.clone())
141            .await
142            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
143
144        // Extract checksum from the request, or compute CRC32 by default.
145        let client_checksum =
146            extract_checksum_from_put(&input).map_err(S3ServiceError::into_s3_error)?;
147        let is_client_provided = client_checksum.is_some();
148
149        let checksum = client_checksum.unwrap_or_else(|| ChecksumData {
150            algorithm: "CRC32".to_owned(),
151            value: compute_checksum(ChecksumAlgorithm::Crc32, &body_data),
152            checksum_type: "FULL_OBJECT".to_owned(),
153        });
154
155        // Validate client-provided checksum against server-computed value.
156        if is_client_provided {
157            if let Ok(algo) = ChecksumAlgorithm::from_str(&checksum.algorithm) {
158                let computed = compute_checksum(algo, &body_data);
159                if checksum.value != computed {
160                    return Err(S3ServiceError::BadDigest.into_s3_error());
161                }
162            }
163        }
164
165        // Build the S3Object.
166        let owner = InternalOwner::default();
167        let obj = S3Object {
168            key: key.clone(),
169            version_id: version_id.clone(),
170            etag: write_result.etag.clone(),
171            size: write_result.size,
172            last_modified: Utc::now(),
173            storage_class: input
174                .storage_class
175                .as_ref()
176                .map_or_else(|| "STANDARD".to_owned(), StorageClass::as_str_owned),
177            metadata,
178            owner,
179            checksum: Some(checksum.clone()),
180            parts_count: None,
181            part_etags: Vec::new(),
182        };
183
184        // Store the object metadata.
185        {
186            let mut store = bucket.objects.write();
187            store.put(obj);
188        }
189
190        debug!(bucket = %bucket_name, key = %key, version_id = %version_id, "put_object completed");
191
192        let real_version_id = if version_id == "null" {
193            None
194        } else {
195            Some(version_id)
196        };
197
198        let cksum = checksum_to_fields(&checksum);
199        Ok(PutObjectOutput {
200            e_tag: Some(write_result.etag),
201            version_id: real_version_id,
202            checksum_crc32: cksum.crc32,
203            checksum_crc32c: cksum.crc32c,
204            checksum_crc64nvme: cksum.crc64nvme,
205            checksum_sha1: cksum.sha1,
206            checksum_sha256: cksum.sha256,
207            checksum_type: cksum.checksum_type,
208            ..PutObjectOutput::default()
209        })
210    }
211
212    /// Get (download) an object.
213    #[allow(clippy::too_many_lines)]
214    pub async fn handle_get_object(
215        &self,
216        input: GetObjectInput,
217    ) -> Result<GetObjectOutput, S3Error> {
218        let bucket_name = input.bucket;
219        let key = input.key;
220        let version_id_param = input.version_id;
221        let if_match_param = input.if_match;
222        let if_none_match_param = input.if_none_match;
223        let range_param = input.range;
224        let checksum_mode = input.checksum_mode;
225
226        // S3 response header overrides (from query parameters in presigned URLs).
227        let override_cache_control = input.response_cache_control;
228        let override_content_disposition = input.response_content_disposition;
229        let override_content_encoding = input.response_content_encoding;
230        let override_content_language = input.response_content_language;
231        let override_content_type = input.response_content_type;
232        let override_expires = input.response_expires;
233
234        // Look up the object and extract all needed data while holding the lock.
235        // The lock must be dropped before any `.await` calls since parking_lot
236        // guards are `!Send`.
237        let (
238            obj_size,
239            obj_etag,
240            obj_last_modified,
241            obj_version_id,
242            obj_storage_class,
243            obj_meta,
244            obj_parts_count,
245            obj_checksum,
246            version_for_storage,
247        ) = {
248            let bucket = self
249                .state
250                .get_bucket(&bucket_name)
251                .map_err(S3ServiceError::into_s3_error)?;
252
253            let store = bucket.objects.read();
254            let obj = if let Some(ref version_id) = version_id_param {
255                store.get_version(&key, version_id).ok_or_else(|| {
256                    // Check if the version is a delete marker.
257                    if store.is_delete_marker(&key, version_id) {
258                        S3ServiceError::MethodNotAllowed
259                            .into_s3_error()
260                            .with_header("x-amz-delete-marker", "true")
261                            .with_header("x-amz-version-id", version_id.clone())
262                    } else {
263                        S3ServiceError::NoSuchVersion {
264                            key: key.clone(),
265                            version_id: version_id.clone(),
266                        }
267                        .into_s3_error()
268                    }
269                })?
270            } else {
271                store
272                    .get(&key)
273                    .ok_or_else(|| S3ServiceError::NoSuchKey { key: key.clone() }.into_s3_error())?
274            };
275
276            // Conditional request checks.
277            if let Some(ref if_match) = if_match_param {
278                if !is_valid_if_match(&obj.etag, if_match) {
279                    return Err(S3ServiceError::PreconditionFailed.into_s3_error());
280                }
281            }
282            if let Some(ref if_none_match) = if_none_match_param {
283                if !is_valid_if_none_match(&obj.etag, if_none_match) {
284                    return Err(S3ServiceError::NotModified.into_s3_error());
285                }
286            }
287
288            let version_id_opt = if obj.version_id == "null" {
289                None
290            } else {
291                Some(obj.version_id.clone())
292            };
293
294            (
295                obj.size,
296                obj.etag.clone(),
297                obj.last_modified,
298                version_id_opt,
299                obj.storage_class.clone(),
300                obj.metadata.clone(),
301                obj.parts_count,
302                obj.checksum.clone(),
303                obj.version_id.clone(),
304            )
305        };
306
307        // Parse range header if provided.
308        let range = if let Some(ref range_value) = range_param {
309            let (start, end) =
310                parse_range_header(range_value, obj_size).map_err(S3ServiceError::into_s3_error)?;
311            Some((start, end))
312        } else {
313            None
314        };
315
316        // Read data from storage.
317        let data = self
318            .storage
319            .read_object(&bucket_name, &key, &version_for_storage, range)
320            .await
321            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
322
323        let content_length = data.len() as i64;
324
325        // Build the streaming body from the data bytes.
326        let body = StreamingBlob::new(data);
327
328        let content_range = range.map(|(start, end)| format!("bytes {start}-{end}/{obj_size}"));
329
330        let content_type = Some(
331            obj_meta
332                .content_type
333                .clone()
334                .unwrap_or_else(|| "binary/octet-stream".to_owned()),
335        );
336
337        let metadata = if obj_meta.user_metadata.is_empty() {
338            HashMap::default()
339        } else {
340            obj_meta.user_metadata.clone()
341        };
342
343        // Only return checksums when ChecksumMode=ENABLED (matching AWS behavior)
344        // and the request is for the full object (not a range request). A range
345        // response covers a subset of the object data, so the full-object
346        // checksum would not match and SDKs would reject the response.
347        let checksum_enabled = checksum_mode
348            .as_ref()
349            .is_some_and(|m| m.as_str() == "ENABLED");
350        let cksum = if checksum_enabled && range.is_none() {
351            obj_checksum.as_ref().map(checksum_to_fields)
352        } else {
353            None
354        };
355        let output = GetObjectOutput {
356            accept_ranges: Some("bytes".to_owned()),
357            body: Some(body),
358            cache_control: override_cache_control.or(obj_meta.cache_control),
359            checksum_crc32: cksum.as_ref().and_then(|c| c.crc32.clone()),
360            checksum_crc32c: cksum.as_ref().and_then(|c| c.crc32c.clone()),
361            checksum_crc64nvme: cksum.as_ref().and_then(|c| c.crc64nvme.clone()),
362            checksum_sha1: cksum.as_ref().and_then(|c| c.sha1.clone()),
363            checksum_sha256: cksum.as_ref().and_then(|c| c.sha256.clone()),
364            checksum_type: cksum.as_ref().and_then(|c| c.checksum_type.clone()),
365            content_disposition: override_content_disposition.or(obj_meta.content_disposition),
366            content_encoding: override_content_encoding.or(obj_meta.content_encoding),
367            content_language: override_content_language.or(obj_meta.content_language),
368            content_length: Some(content_length),
369            content_range,
370            content_type: override_content_type.or(content_type),
371            expires: override_expires.map(|dt| dt.to_rfc2822()),
372            e_tag: Some(obj_etag),
373            last_modified: Some(obj_last_modified),
374            metadata,
375            object_lock_legal_hold_status: obj_meta
376                .object_lock_legal_hold
377                .filter(|&v| v)
378                .map(|_| ObjectLockLegalHoldStatus::from("ON")),
379            object_lock_mode: obj_meta
380                .object_lock_mode
381                .as_deref()
382                .map(ObjectLockMode::from),
383            object_lock_retain_until_date: obj_meta.object_lock_retain_until,
384            parts_count: obj_parts_count.map(|n| n as i32),
385            sse_customer_algorithm: obj_meta.sse_customer_algorithm,
386            sse_customer_key_md5: obj_meta.sse_customer_key_md5,
387            ssekms_key_id: obj_meta.sse_kms_key_id,
388            server_side_encryption: obj_meta
389                .sse_algorithm
390                .as_deref()
391                .map(ServerSideEncryption::from),
392            storage_class: Some(StorageClass::from(obj_storage_class.as_str())),
393            version_id: obj_version_id,
394            ..GetObjectOutput::default()
395        };
396        Ok(output)
397    }
398
399    /// Head object (get metadata without body).
400    #[allow(clippy::too_many_lines)]
401    pub async fn handle_head_object(
402        &self,
403        input: HeadObjectInput,
404    ) -> Result<HeadObjectOutput, S3Error> {
405        let bucket_name = input.bucket;
406        let key = input.key;
407        let version_id_param = input.version_id;
408        let checksum_mode = input.checksum_mode;
409
410        // S3 response header overrides (from query parameters in presigned URLs).
411        let override_cache_control = input.response_cache_control;
412        let override_content_disposition = input.response_content_disposition;
413        let override_content_encoding = input.response_content_encoding;
414        let override_content_language = input.response_content_language;
415        let override_content_type = input.response_content_type;
416        let override_expires = input.response_expires;
417
418        let bucket = self
419            .state
420            .get_bucket(&bucket_name)
421            .map_err(S3ServiceError::into_s3_error)?;
422
423        let store = bucket.objects.read();
424        let obj = if let Some(ref version_id) = version_id_param {
425            store.get_version(&key, version_id).ok_or_else(|| {
426                if store.is_delete_marker(&key, version_id) {
427                    S3ServiceError::MethodNotAllowed
428                        .into_s3_error()
429                        .with_header("x-amz-delete-marker", "true")
430                        .with_header("x-amz-version-id", version_id.clone())
431                } else {
432                    S3ServiceError::NoSuchVersion {
433                        key: key.clone(),
434                        version_id: version_id.clone(),
435                    }
436                    .into_s3_error()
437                }
438            })?
439        } else {
440            store
441                .get(&key)
442                .ok_or_else(|| S3ServiceError::NoSuchKey { key: key.clone() }.into_s3_error())?
443        };
444
445        let obj_version_id = if obj.version_id == "null" {
446            None
447        } else {
448            Some(obj.version_id.clone())
449        };
450
451        let content_type = Some(
452            obj.metadata
453                .content_type
454                .clone()
455                .unwrap_or_else(|| "binary/octet-stream".to_owned()),
456        );
457
458        let metadata = if obj.metadata.user_metadata.is_empty() {
459            HashMap::default()
460        } else {
461            obj.metadata.user_metadata.clone()
462        };
463
464        // Only return checksums when ChecksumMode=ENABLED (matching AWS behavior).
465        let checksum_enabled = checksum_mode
466            .as_ref()
467            .is_some_and(|m| m.as_str() == "ENABLED");
468        let cksum = if checksum_enabled {
469            obj.checksum.as_ref().map(checksum_to_fields)
470        } else {
471            None
472        };
473        let output = HeadObjectOutput {
474            accept_ranges: Some("bytes".to_owned()),
475            cache_control: override_cache_control.or(obj.metadata.cache_control.clone()),
476            checksum_crc32: cksum.as_ref().and_then(|c| c.crc32.clone()),
477            checksum_crc32c: cksum.as_ref().and_then(|c| c.crc32c.clone()),
478            checksum_crc64nvme: cksum.as_ref().and_then(|c| c.crc64nvme.clone()),
479            checksum_sha1: cksum.as_ref().and_then(|c| c.sha1.clone()),
480            checksum_sha256: cksum.as_ref().and_then(|c| c.sha256.clone()),
481            checksum_type: cksum.as_ref().and_then(|c| c.checksum_type.clone()),
482            content_disposition: override_content_disposition
483                .or(obj.metadata.content_disposition.clone()),
484            content_encoding: override_content_encoding.or(obj.metadata.content_encoding.clone()),
485            content_language: override_content_language.or(obj.metadata.content_language.clone()),
486            content_length: Some(obj.size as i64),
487            content_type: override_content_type.or(content_type),
488            expires: override_expires.map(|dt| dt.to_rfc2822()),
489            e_tag: Some(obj.etag.clone()),
490            last_modified: Some(obj.last_modified),
491            metadata,
492            object_lock_legal_hold_status: obj
493                .metadata
494                .object_lock_legal_hold
495                .filter(|&v| v)
496                .map(|_| ObjectLockLegalHoldStatus::from("ON")),
497            object_lock_mode: obj
498                .metadata
499                .object_lock_mode
500                .as_deref()
501                .map(ObjectLockMode::from),
502            object_lock_retain_until_date: obj.metadata.object_lock_retain_until,
503            parts_count: obj.parts_count.map(|n| n as i32),
504            sse_customer_algorithm: obj.metadata.sse_customer_algorithm.clone(),
505            sse_customer_key_md5: obj.metadata.sse_customer_key_md5.clone(),
506            ssekms_key_id: obj.metadata.sse_kms_key_id.clone(),
507            server_side_encryption: obj
508                .metadata
509                .sse_algorithm
510                .as_deref()
511                .map(ServerSideEncryption::from),
512            storage_class: Some(StorageClass::from(obj.storage_class.as_str())),
513            version_id: obj_version_id,
514            ..HeadObjectOutput::default()
515        };
516        Ok(output)
517    }
518
519    /// Delete a single object.
520    pub async fn handle_delete_object(
521        &self,
522        input: DeleteObjectInput,
523    ) -> Result<DeleteObjectOutput, S3Error> {
524        let bucket_name = input.bucket;
525        let key = input.key;
526
527        let bucket = self
528            .state
529            .get_bucket(&bucket_name)
530            .map_err(S3ServiceError::into_s3_error)?;
531
532        let (delete_marker_version_id, version_id_to_remove) =
533            if let Some(version_id) = &input.version_id {
534                // Delete a specific version.
535                let bypass = input.bypass_governance_retention.unwrap_or(false);
536                let mut store = bucket.objects.write();
537                check_object_lock_for_delete(&store, &key, version_id, bypass)?;
538                let removed = store.delete_version(&key, version_id);
539                if let Some(ref version) = removed {
540                    self.storage
541                        .delete_object(&bucket_name, &key, version.version_id());
542                }
543                let is_dm = removed
544                    .as_ref()
545                    .is_some_and(crate::state::object::ObjectVersion::is_delete_marker);
546                (is_dm, removed.map(|v| v.version_id().to_owned()))
547            } else {
548                // Delete without version: in versioned bucket, create delete marker.
549                let mut store = bucket.objects.write();
550                let (dm_id, _had) = store.delete_versioned(&key, &InternalOwner::default());
551                if dm_id.is_none() {
552                    // Un-versioned bucket: remove the storage data.
553                    self.storage.delete_object(&bucket_name, &key, "null");
554                }
555                (dm_id.is_some(), dm_id)
556            };
557
558        debug!(bucket = %bucket_name, key = %key, "delete_object completed");
559
560        Ok(DeleteObjectOutput {
561            delete_marker: if delete_marker_version_id {
562                Some(true)
563            } else {
564                None
565            },
566            request_charged: None,
567            version_id: version_id_to_remove,
568        })
569    }
570
571    /// Delete multiple objects (bulk delete).
572    pub async fn handle_delete_objects(
573        &self,
574        input: DeleteObjectsInput,
575    ) -> Result<DeleteObjectsOutput, S3Error> {
576        let bucket_name = input.bucket;
577
578        let bucket = self
579            .state
580            .get_bucket(&bucket_name)
581            .map_err(S3ServiceError::into_s3_error)?;
582
583        let bypass = input.bypass_governance_retention.unwrap_or(false);
584        let delete_request = input.delete;
585
586        let objects = delete_request.objects;
587        let quiet = delete_request.quiet.unwrap_or(false);
588
589        let mut deleted: Vec<DeletedObject> = Vec::with_capacity(objects.len());
590        let mut errors: Vec<rustack_s3_model::types::Error> = Vec::new();
591
592        for obj_id in objects {
593            let key = obj_id.key;
594            let version_id = obj_id.version_id;
595
596            if let Some(ref vid) = version_id {
597                // Delete a specific version.
598                let mut store = bucket.objects.write();
599                if let Err(lock_err) = check_object_lock_for_delete(&store, &key, vid, bypass) {
600                    errors.push(rustack_s3_model::types::Error {
601                        code: Some(lock_err.code.as_str().to_owned()),
602                        key: Some(key),
603                        message: Some(lock_err.message),
604                        version_id: Some(vid.clone()),
605                    });
606                    continue;
607                }
608                let removed = store.delete_version(&key, vid);
609                if let Some(ref version) = removed {
610                    self.storage
611                        .delete_object(&bucket_name, &key, version.version_id());
612                }
613                let is_dm = removed
614                    .as_ref()
615                    .is_some_and(crate::state::object::ObjectVersion::is_delete_marker);
616                deleted.push(DeletedObject {
617                    delete_marker: if is_dm { Some(true) } else { None },
618                    delete_marker_version_id: if is_dm { Some(vid.clone()) } else { None },
619                    key: Some(key),
620                    version_id: Some(vid.clone()),
621                });
622            } else {
623                // Delete without version.
624                let mut store = bucket.objects.write();
625                let (dm_id, _had) = store.delete_versioned(&key, &InternalOwner::default());
626                if dm_id.is_none() {
627                    self.storage.delete_object(&bucket_name, &key, "null");
628                }
629                deleted.push(DeletedObject {
630                    delete_marker: dm_id.as_ref().map(|_| true),
631                    delete_marker_version_id: dm_id.clone(),
632                    key: Some(key),
633                    version_id: dm_id,
634                });
635            }
636        }
637
638        debug!(
639            bucket = %bucket_name,
640            deleted_count = deleted.len(),
641            error_count = errors.len(),
642            "delete_objects completed"
643        );
644
645        Ok(DeleteObjectsOutput {
646            deleted: if quiet { Vec::new() } else { deleted },
647            errors,
648            request_charged: None,
649        })
650    }
651
652    /// Copy an object from a source to a destination.
653    #[allow(clippy::too_many_lines)]
654    pub async fn handle_copy_object(
655        &self,
656        input: CopyObjectInput,
657    ) -> Result<CopyObjectOutput, S3Error> {
658        let dst_bucket = input.bucket.clone();
659        let dst_key = input.key.clone();
660
661        validate_object_key(&dst_key).map_err(S3ServiceError::into_s3_error)?;
662
663        let (src_bucket, src_key, src_version_id) =
664            parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
665
666        // Look up source object to get its metadata.
667        // Keep this entire block synchronous -- no awaits while the lock is held.
668        let (src_metadata, src_checksum, src_version_for_storage) = {
669            let src_bucket_ref = self
670                .state
671                .get_bucket(&src_bucket)
672                .map_err(S3ServiceError::into_s3_error)?;
673
674            let src_store = src_bucket_ref.objects.read();
675            let src_obj = if let Some(ref vid) = src_version_id {
676                src_store.get_version(&src_key, vid).ok_or_else(|| {
677                    S3ServiceError::NoSuchVersion {
678                        key: src_key.clone(),
679                        version_id: vid.clone(),
680                    }
681                    .into_s3_error()
682                })?
683            } else {
684                src_store.get(&src_key).ok_or_else(|| {
685                    S3ServiceError::NoSuchKey {
686                        key: src_key.clone(),
687                    }
688                    .into_s3_error()
689                })?
690            };
691
692            (
693                src_obj.metadata.clone(),
694                src_obj.checksum.clone(),
695                src_obj.version_id.clone(),
696            )
697        };
698
699        // Determine destination versioning.
700        let dst_bucket_ref = self
701            .state
702            .get_bucket(&dst_bucket)
703            .map_err(S3ServiceError::into_s3_error)?;
704
705        let dst_version_id = if dst_bucket_ref.is_versioning_enabled() {
706            crate::utils::generate_version_id()
707        } else {
708            "null".to_owned()
709        };
710
711        // Drop the bucket ref before await to avoid holding it across await points.
712        drop(dst_bucket_ref);
713
714        // Copy storage data.
715        let write_result = self
716            .storage
717            .copy_object(
718                &src_bucket,
719                &src_key,
720                &src_version_for_storage,
721                &dst_bucket,
722                &dst_key,
723                &dst_version_id,
724            )
725            .await
726            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
727
728        // Determine metadata: use source metadata unless MetadataDirective is REPLACE.
729        let metadata = if input
730            .metadata_directive
731            .as_ref()
732            .is_some_and(|d| *d == MetadataDirective::Replace)
733        {
734            build_metadata_for_copy(&input)
735        } else {
736            src_metadata
737        };
738
739        let storage_class = input
740            .storage_class
741            .as_ref()
742            .map_or_else(|| "STANDARD".to_owned(), StorageClass::as_str_owned);
743
744        let now = Utc::now();
745        let dst_obj = S3Object {
746            key: dst_key.clone(),
747            version_id: dst_version_id.clone(),
748            etag: write_result.etag.clone(),
749            size: write_result.size,
750            last_modified: now,
751            storage_class,
752            metadata,
753            owner: InternalOwner::default(),
754            checksum: src_checksum,
755            parts_count: None,
756            part_etags: Vec::new(),
757        };
758
759        // Re-acquire the bucket ref to store the object.
760        let dst_bucket_ref = self
761            .state
762            .get_bucket(&dst_bucket)
763            .map_err(S3ServiceError::into_s3_error)?;
764        {
765            let mut store = dst_bucket_ref.objects.write();
766            store.put(dst_obj);
767        }
768
769        debug!(
770            src_bucket = %src_bucket,
771            src_key = %src_key,
772            dst_bucket = %dst_bucket,
773            dst_key = %dst_key,
774            "copy_object completed"
775        );
776
777        let real_version_id = if dst_version_id == "null" {
778            None
779        } else {
780            Some(dst_version_id)
781        };
782
783        let copy_result = CopyObjectResult {
784            e_tag: Some(write_result.etag),
785            last_modified: Some(now),
786            ..CopyObjectResult::default()
787        };
788
789        Ok(CopyObjectOutput {
790            copy_object_result: Some(copy_result),
791            copy_source_version_id: src_version_id,
792            version_id: real_version_id,
793            ..CopyObjectOutput::default()
794        })
795    }
796}
797
798// ---------------------------------------------------------------------------
799// Helpers
800// ---------------------------------------------------------------------------
801
802/// Helper trait to get an owned string from a [`StorageClass`] reference.
803///
804/// This avoids closure type inference issues when calling `as_str()` through
805/// `Option::map_or_else`.
806trait AsStrOwned {
807    /// Return `as_str().to_owned()`.
808    fn as_str_owned(&self) -> String;
809}
810
811impl AsStrOwned for StorageClass {
812    fn as_str_owned(&self) -> String {
813        self.as_str().to_owned()
814    }
815}
816
817/// Build [`ObjectMetadata`] from a [`PutObjectInput`].
818fn build_metadata(input: &PutObjectInput) -> ObjectMetadata {
819    let user_metadata = input.metadata.clone();
820
821    // Parse tagging from the x-amz-tagging header.
822    let tagging = input
823        .tagging
824        .as_deref()
825        .map(parse_tagging_header)
826        .unwrap_or_default();
827
828    let acl = parse_acl(input.acl.as_ref());
829
830    ObjectMetadata {
831        content_type: input.content_type.clone(),
832        content_encoding: input.content_encoding.clone(),
833        content_disposition: input.content_disposition.clone(),
834        content_language: input.content_language.clone(),
835        cache_control: input.cache_control.clone(),
836        expires: input.expires.clone(),
837        user_metadata,
838        sse_algorithm: input
839            .server_side_encryption
840            .as_ref()
841            .map(|sse: &ServerSideEncryption| sse.as_str().to_owned()),
842        sse_kms_key_id: input.ssekms_key_id.clone(),
843        sse_bucket_key_enabled: input.bucket_key_enabled,
844        sse_customer_algorithm: input.sse_customer_algorithm.clone(),
845        sse_customer_key_md5: input.sse_customer_key_md5.clone(),
846        tagging,
847        acl,
848        object_lock_mode: input
849            .object_lock_mode
850            .as_ref()
851            .map(|m: &ObjectLockMode| m.as_str().to_owned()),
852        object_lock_retain_until: input.object_lock_retain_until_date,
853        object_lock_legal_hold: input
854            .object_lock_legal_hold_status
855            .as_ref()
856            .map(|s: &ObjectLockLegalHoldStatus| s.as_str() == "ON"),
857    }
858}
859
860/// Build [`ObjectMetadata`] for a copy operation with REPLACE directive.
861fn build_metadata_for_copy(input: &CopyObjectInput) -> ObjectMetadata {
862    let user_metadata = input.metadata.clone();
863
864    let tagging = input
865        .tagging
866        .as_deref()
867        .map(parse_tagging_header)
868        .unwrap_or_default();
869
870    let acl = parse_acl(input.acl.as_ref());
871
872    ObjectMetadata {
873        content_type: input.content_type.clone(),
874        content_encoding: input.content_encoding.clone(),
875        content_disposition: input.content_disposition.clone(),
876        content_language: input.content_language.clone(),
877        cache_control: input.cache_control.clone(),
878        expires: None,
879        user_metadata,
880        sse_algorithm: input
881            .server_side_encryption
882            .as_ref()
883            .map(|sse: &ServerSideEncryption| sse.as_str().to_owned()),
884        sse_kms_key_id: input.ssekms_key_id.clone(),
885        sse_bucket_key_enabled: input.bucket_key_enabled,
886        sse_customer_algorithm: input.sse_customer_algorithm.clone(),
887        sse_customer_key_md5: input.sse_customer_key_md5.clone(),
888        tagging,
889        acl,
890        object_lock_mode: input
891            .object_lock_mode
892            .as_ref()
893            .map(|m: &ObjectLockMode| m.as_str().to_owned()),
894        object_lock_retain_until: input.object_lock_retain_until_date,
895        object_lock_legal_hold: input
896            .object_lock_legal_hold_status
897            .as_ref()
898            .map(|s: &ObjectLockLegalHoldStatus| s.as_str() == "ON"),
899    }
900}
901
902/// Parse an optional [`ObjectCannedACL`] into our internal [`CannedAcl`].
903fn parse_acl(acl: Option<&ObjectCannedACL>) -> CannedAcl {
904    acl.and_then(|a| a.as_str().parse::<CannedAcl>().ok())
905        .unwrap_or_default()
906}
907
908/// Parse the `x-amz-tagging` URL-encoded query string into tag pairs.
909pub(super) fn parse_tagging_header(tagging: &str) -> Vec<(String, String)> {
910    tagging
911        .split('&')
912        .filter(|s| !s.is_empty())
913        .filter_map(|pair| {
914            let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
915            let key = percent_encoding::percent_decode_str(k)
916                .decode_utf8()
917                .ok()?
918                .to_string();
919            let value = percent_encoding::percent_decode_str(v)
920                .decode_utf8()
921                .ok()?
922                .to_string();
923            Some((key, value))
924        })
925        .collect()
926}
927
928/// Holds the individual checksum fields for populating output structs.
929struct ChecksumFields {
930    crc32: Option<String>,
931    crc32c: Option<String>,
932    crc64nvme: Option<String>,
933    sha1: Option<String>,
934    sha256: Option<String>,
935    checksum_type: Option<ChecksumType>,
936}
937
938/// Map a [`ChecksumData`] to individual output fields.
939fn checksum_to_fields(checksum: &ChecksumData) -> ChecksumFields {
940    let mut fields = ChecksumFields {
941        crc32: None,
942        crc32c: None,
943        crc64nvme: None,
944        sha1: None,
945        sha256: None,
946        checksum_type: None,
947    };
948    match checksum.algorithm.as_str() {
949        "CRC32" => fields.crc32 = Some(checksum.value.clone()),
950        "CRC32C" => fields.crc32c = Some(checksum.value.clone()),
951        "CRC64NVME" => fields.crc64nvme = Some(checksum.value.clone()),
952        "SHA1" => fields.sha1 = Some(checksum.value.clone()),
953        "SHA256" => fields.sha256 = Some(checksum.value.clone()),
954        _ => {}
955    }
956    fields.checksum_type = Some(match checksum.checksum_type.as_str() {
957        "COMPOSITE" => ChecksumType::Composite,
958        _ => ChecksumType::FullObject,
959    });
960    fields
961}
962
963/// Extract checksum data from a [`PutObjectInput`] if any checksum fields are
964/// set.
965///
966/// Returns an error if multiple checksum fields are set (only one is allowed
967/// per request, matching AWS behavior).
968fn extract_checksum_from_put(
969    input: &PutObjectInput,
970) -> Result<Option<ChecksumData>, S3ServiceError> {
971    let candidates: [(&str, &Option<String>); 5] = [
972        ("CRC32", &input.checksum_crc32),
973        ("CRC32C", &input.checksum_crc32c),
974        ("CRC64NVME", &input.checksum_crc64nvme),
975        ("SHA1", &input.checksum_sha1),
976        ("SHA256", &input.checksum_sha256),
977    ];
978    let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
979    if found.len() > 1 {
980        return Err(S3ServiceError::InvalidArgument {
981            message: "Only one checksum value can be provided per request".to_owned(),
982        });
983    }
984    Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
985        algorithm: (*alg).to_owned(),
986        value: val.as_ref().unwrap_or(&String::new()).clone(),
987        checksum_type: "FULL_OBJECT".to_owned(),
988    }))
989}
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994
995    #[test]
996    fn test_should_parse_copy_source_simple() {
997        let (bucket, key, vid) = parse_copy_source("my-bucket/my-key").unwrap();
998        assert_eq!(bucket, "my-bucket");
999        assert_eq!(key, "my-key");
1000        assert!(vid.is_none());
1001    }
1002
1003    #[test]
1004    fn test_should_parse_copy_source_with_leading_slash() {
1005        let (bucket, key, vid) = parse_copy_source("/my-bucket/my-key").unwrap();
1006        assert_eq!(bucket, "my-bucket");
1007        assert_eq!(key, "my-key");
1008        assert!(vid.is_none());
1009    }
1010
1011    #[test]
1012    fn test_should_parse_copy_source_with_version_id() {
1013        let (bucket, key, vid) = parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
1014        assert_eq!(bucket, "my-bucket");
1015        assert_eq!(key, "my-key");
1016        assert_eq!(vid.as_deref(), Some("abc123"));
1017    }
1018
1019    #[test]
1020    fn test_should_parse_copy_source_with_nested_key() {
1021        let (bucket, key, vid) = parse_copy_source("bucket/path/to/key").unwrap();
1022        assert_eq!(bucket, "bucket");
1023        assert_eq!(key, "path/to/key");
1024        assert!(vid.is_none());
1025    }
1026
1027    #[test]
1028    fn test_should_parse_copy_source_with_encoded_key() {
1029        let (bucket, key, vid) = parse_copy_source("bucket/path%20to/key%2B1").unwrap();
1030        assert_eq!(bucket, "bucket");
1031        assert_eq!(key, "path to/key+1");
1032        assert!(vid.is_none());
1033    }
1034
1035    #[test]
1036    fn test_should_reject_copy_source_no_key() {
1037        assert!(parse_copy_source("bucket-only").is_err());
1038    }
1039
1040    #[test]
1041    fn test_should_reject_copy_source_empty_bucket() {
1042        assert!(parse_copy_source("/").is_err());
1043    }
1044
1045    #[test]
1046    fn test_should_reject_copy_source_empty_key() {
1047        assert!(parse_copy_source("bucket/").is_err());
1048    }
1049
1050    #[test]
1051    fn test_should_parse_tagging_header_basic() {
1052        let tags = parse_tagging_header("key1=value1&key2=value2");
1053        assert_eq!(tags.len(), 2);
1054        assert_eq!(tags[0], ("key1".to_owned(), "value1".to_owned()));
1055        assert_eq!(tags[1], ("key2".to_owned(), "value2".to_owned()));
1056    }
1057
1058    #[test]
1059    fn test_should_parse_tagging_header_encoded() {
1060        let tags = parse_tagging_header("key%201=value%201");
1061        assert_eq!(tags.len(), 1);
1062        assert_eq!(tags[0], ("key 1".to_owned(), "value 1".to_owned()));
1063    }
1064
1065    #[test]
1066    fn test_should_parse_tagging_header_empty() {
1067        let tags = parse_tagging_header("");
1068        assert!(tags.is_empty());
1069    }
1070
1071    #[test]
1072    fn test_should_parse_tagging_header_no_value() {
1073        let tags = parse_tagging_header("key1");
1074        assert_eq!(tags.len(), 1);
1075        assert_eq!(tags[0], ("key1".to_owned(), String::new()));
1076    }
1077}