1use std::{collections::HashMap, str::FromStr};
7
8use bytes::Bytes;
9use chrono::Utc;
10use rustack_s3_model::{
11 error::{S3Error, S3ErrorCode},
12 input::{
13 CopyObjectInput, DeleteObjectInput, DeleteObjectsInput, GetObjectInput, HeadObjectInput,
14 PutObjectInput,
15 },
16 output::{
17 CopyObjectOutput, DeleteObjectOutput, DeleteObjectsOutput, GetObjectOutput,
18 HeadObjectOutput, PutObjectOutput,
19 },
20 request::StreamingBlob,
21 types::{
22 ChecksumType, CopyObjectResult, DeletedObject, MetadataDirective, ObjectCannedACL,
23 ObjectLockLegalHoldStatus, ObjectLockMode, ServerSideEncryption, StorageClass,
24 },
25};
26use tracing::debug;
27
28use crate::{
29 checksums::{ChecksumAlgorithm, compute_checksum},
30 error::S3ServiceError,
31 provider::RustackS3,
32 state::{
33 keystore::ObjectStore,
34 object::{CannedAcl, ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
35 },
36 utils::{is_valid_if_match, is_valid_if_none_match, parse_copy_source, parse_range_header},
37 validation::{validate_content_md5, validate_metadata, validate_object_key},
38};
39
40#[allow(clippy::result_large_err)]
57fn check_object_lock_for_delete(
58 store: &ObjectStore,
59 key: &str,
60 version_id: &str,
61 bypass_governance: bool,
62) -> Result<(), S3Error> {
63 let Some(obj) = store.get_version(key, version_id) else {
64 return Ok(());
66 };
67
68 if obj.metadata.object_lock_legal_hold == Some(true) {
69 return Err(S3Error::with_message(
70 S3ErrorCode::AccessDenied,
71 "Object Lock legal hold is enabled on this object",
72 ));
73 }
74
75 if let Some(retain_until) = obj.metadata.object_lock_retain_until {
76 if retain_until > Utc::now() {
77 let is_governance = obj.metadata.object_lock_mode.as_deref() == Some("GOVERNANCE");
78 if is_governance && bypass_governance {
79 } else {
81 return Err(S3Error::with_message(
82 S3ErrorCode::AccessDenied,
83 "Object Lock retention period has not expired",
84 ));
85 }
86 }
87 }
88
89 Ok(())
90}
91
92#[allow(
97 clippy::cast_possible_wrap,
98 clippy::cast_possible_truncation,
99 clippy::cast_sign_loss,
100 clippy::unused_async
101)]
102impl RustackS3 {
103 pub async fn handle_put_object(
105 &self,
106 mut input: PutObjectInput,
107 ) -> Result<PutObjectOutput, S3Error> {
108 let bucket_name = input.bucket.clone();
109 let key = input.key.clone();
110
111 validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
112
113 let bucket = self
115 .state
116 .get_bucket(&bucket_name)
117 .map_err(S3ServiceError::into_s3_error)?;
118
119 let body_data = input.body.take().map_or_else(Bytes::new, |b| b.data);
121
122 validate_content_md5(input.content_md5.as_deref(), &body_data)
124 .map_err(S3ServiceError::into_s3_error)?;
125
126 let metadata = build_metadata(&input);
128 validate_metadata(&metadata.user_metadata).map_err(S3ServiceError::into_s3_error)?;
129
130 let version_id = if bucket.is_versioning_enabled() {
132 crate::utils::generate_version_id()
133 } else {
134 "null".to_owned()
135 };
136
137 let write_result = self
139 .storage
140 .write_object(&bucket_name, &key, &version_id, body_data.clone())
141 .await
142 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
143
144 let client_checksum =
146 extract_checksum_from_put(&input).map_err(S3ServiceError::into_s3_error)?;
147 let is_client_provided = client_checksum.is_some();
148
149 let checksum = client_checksum.unwrap_or_else(|| ChecksumData {
150 algorithm: "CRC32".to_owned(),
151 value: compute_checksum(ChecksumAlgorithm::Crc32, &body_data),
152 checksum_type: "FULL_OBJECT".to_owned(),
153 });
154
155 if is_client_provided {
157 if let Ok(algo) = ChecksumAlgorithm::from_str(&checksum.algorithm) {
158 let computed = compute_checksum(algo, &body_data);
159 if checksum.value != computed {
160 return Err(S3ServiceError::BadDigest.into_s3_error());
161 }
162 }
163 }
164
165 let owner = InternalOwner::default();
167 let obj = S3Object {
168 key: key.clone(),
169 version_id: version_id.clone(),
170 etag: write_result.etag.clone(),
171 size: write_result.size,
172 last_modified: Utc::now(),
173 storage_class: input
174 .storage_class
175 .as_ref()
176 .map_or_else(|| "STANDARD".to_owned(), StorageClass::as_str_owned),
177 metadata,
178 owner,
179 checksum: Some(checksum.clone()),
180 parts_count: None,
181 part_etags: Vec::new(),
182 };
183
184 {
186 let mut store = bucket.objects.write();
187 store.put(obj);
188 }
189
190 debug!(bucket = %bucket_name, key = %key, version_id = %version_id, "put_object completed");
191
192 let real_version_id = if version_id == "null" {
193 None
194 } else {
195 Some(version_id)
196 };
197
198 let cksum = checksum_to_fields(&checksum);
199 Ok(PutObjectOutput {
200 e_tag: Some(write_result.etag),
201 version_id: real_version_id,
202 checksum_crc32: cksum.crc32,
203 checksum_crc32c: cksum.crc32c,
204 checksum_crc64nvme: cksum.crc64nvme,
205 checksum_sha1: cksum.sha1,
206 checksum_sha256: cksum.sha256,
207 checksum_type: cksum.checksum_type,
208 ..PutObjectOutput::default()
209 })
210 }
211
212 #[allow(clippy::too_many_lines)]
214 pub async fn handle_get_object(
215 &self,
216 input: GetObjectInput,
217 ) -> Result<GetObjectOutput, S3Error> {
218 let bucket_name = input.bucket;
219 let key = input.key;
220 let version_id_param = input.version_id;
221 let if_match_param = input.if_match;
222 let if_none_match_param = input.if_none_match;
223 let range_param = input.range;
224 let checksum_mode = input.checksum_mode;
225
226 let override_cache_control = input.response_cache_control;
228 let override_content_disposition = input.response_content_disposition;
229 let override_content_encoding = input.response_content_encoding;
230 let override_content_language = input.response_content_language;
231 let override_content_type = input.response_content_type;
232 let override_expires = input.response_expires;
233
234 let (
238 obj_size,
239 obj_etag,
240 obj_last_modified,
241 obj_version_id,
242 obj_storage_class,
243 obj_meta,
244 obj_parts_count,
245 obj_checksum,
246 version_for_storage,
247 ) = {
248 let bucket = self
249 .state
250 .get_bucket(&bucket_name)
251 .map_err(S3ServiceError::into_s3_error)?;
252
253 let store = bucket.objects.read();
254 let obj = if let Some(ref version_id) = version_id_param {
255 store.get_version(&key, version_id).ok_or_else(|| {
256 if store.is_delete_marker(&key, version_id) {
258 S3ServiceError::MethodNotAllowed
259 .into_s3_error()
260 .with_header("x-amz-delete-marker", "true")
261 .with_header("x-amz-version-id", version_id.clone())
262 } else {
263 S3ServiceError::NoSuchVersion {
264 key: key.clone(),
265 version_id: version_id.clone(),
266 }
267 .into_s3_error()
268 }
269 })?
270 } else {
271 store
272 .get(&key)
273 .ok_or_else(|| S3ServiceError::NoSuchKey { key: key.clone() }.into_s3_error())?
274 };
275
276 if let Some(ref if_match) = if_match_param {
278 if !is_valid_if_match(&obj.etag, if_match) {
279 return Err(S3ServiceError::PreconditionFailed.into_s3_error());
280 }
281 }
282 if let Some(ref if_none_match) = if_none_match_param {
283 if !is_valid_if_none_match(&obj.etag, if_none_match) {
284 return Err(S3ServiceError::NotModified.into_s3_error());
285 }
286 }
287
288 let version_id_opt = if obj.version_id == "null" {
289 None
290 } else {
291 Some(obj.version_id.clone())
292 };
293
294 (
295 obj.size,
296 obj.etag.clone(),
297 obj.last_modified,
298 version_id_opt,
299 obj.storage_class.clone(),
300 obj.metadata.clone(),
301 obj.parts_count,
302 obj.checksum.clone(),
303 obj.version_id.clone(),
304 )
305 };
306
307 let range = if let Some(ref range_value) = range_param {
309 let (start, end) =
310 parse_range_header(range_value, obj_size).map_err(S3ServiceError::into_s3_error)?;
311 Some((start, end))
312 } else {
313 None
314 };
315
316 let data = self
318 .storage
319 .read_object(&bucket_name, &key, &version_for_storage, range)
320 .await
321 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
322
323 let content_length = data.len() as i64;
324
325 let body = StreamingBlob::new(data);
327
328 let content_range = range.map(|(start, end)| format!("bytes {start}-{end}/{obj_size}"));
329
330 let content_type = Some(
331 obj_meta
332 .content_type
333 .clone()
334 .unwrap_or_else(|| "binary/octet-stream".to_owned()),
335 );
336
337 let metadata = if obj_meta.user_metadata.is_empty() {
338 HashMap::default()
339 } else {
340 obj_meta.user_metadata.clone()
341 };
342
343 let checksum_enabled = checksum_mode
348 .as_ref()
349 .is_some_and(|m| m.as_str() == "ENABLED");
350 let cksum = if checksum_enabled && range.is_none() {
351 obj_checksum.as_ref().map(checksum_to_fields)
352 } else {
353 None
354 };
355 let output = GetObjectOutput {
356 accept_ranges: Some("bytes".to_owned()),
357 body: Some(body),
358 cache_control: override_cache_control.or(obj_meta.cache_control),
359 checksum_crc32: cksum.as_ref().and_then(|c| c.crc32.clone()),
360 checksum_crc32c: cksum.as_ref().and_then(|c| c.crc32c.clone()),
361 checksum_crc64nvme: cksum.as_ref().and_then(|c| c.crc64nvme.clone()),
362 checksum_sha1: cksum.as_ref().and_then(|c| c.sha1.clone()),
363 checksum_sha256: cksum.as_ref().and_then(|c| c.sha256.clone()),
364 checksum_type: cksum.as_ref().and_then(|c| c.checksum_type.clone()),
365 content_disposition: override_content_disposition.or(obj_meta.content_disposition),
366 content_encoding: override_content_encoding.or(obj_meta.content_encoding),
367 content_language: override_content_language.or(obj_meta.content_language),
368 content_length: Some(content_length),
369 content_range,
370 content_type: override_content_type.or(content_type),
371 expires: override_expires.map(|dt| dt.to_rfc2822()),
372 e_tag: Some(obj_etag),
373 last_modified: Some(obj_last_modified),
374 metadata,
375 object_lock_legal_hold_status: obj_meta
376 .object_lock_legal_hold
377 .filter(|&v| v)
378 .map(|_| ObjectLockLegalHoldStatus::from("ON")),
379 object_lock_mode: obj_meta
380 .object_lock_mode
381 .as_deref()
382 .map(ObjectLockMode::from),
383 object_lock_retain_until_date: obj_meta.object_lock_retain_until,
384 parts_count: obj_parts_count.map(|n| n as i32),
385 sse_customer_algorithm: obj_meta.sse_customer_algorithm,
386 sse_customer_key_md5: obj_meta.sse_customer_key_md5,
387 ssekms_key_id: obj_meta.sse_kms_key_id,
388 server_side_encryption: obj_meta
389 .sse_algorithm
390 .as_deref()
391 .map(ServerSideEncryption::from),
392 storage_class: Some(StorageClass::from(obj_storage_class.as_str())),
393 version_id: obj_version_id,
394 ..GetObjectOutput::default()
395 };
396 Ok(output)
397 }
398
399 #[allow(clippy::too_many_lines)]
401 pub async fn handle_head_object(
402 &self,
403 input: HeadObjectInput,
404 ) -> Result<HeadObjectOutput, S3Error> {
405 let bucket_name = input.bucket;
406 let key = input.key;
407 let version_id_param = input.version_id;
408 let checksum_mode = input.checksum_mode;
409
410 let override_cache_control = input.response_cache_control;
412 let override_content_disposition = input.response_content_disposition;
413 let override_content_encoding = input.response_content_encoding;
414 let override_content_language = input.response_content_language;
415 let override_content_type = input.response_content_type;
416 let override_expires = input.response_expires;
417
418 let bucket = self
419 .state
420 .get_bucket(&bucket_name)
421 .map_err(S3ServiceError::into_s3_error)?;
422
423 let store = bucket.objects.read();
424 let obj = if let Some(ref version_id) = version_id_param {
425 store.get_version(&key, version_id).ok_or_else(|| {
426 if store.is_delete_marker(&key, version_id) {
427 S3ServiceError::MethodNotAllowed
428 .into_s3_error()
429 .with_header("x-amz-delete-marker", "true")
430 .with_header("x-amz-version-id", version_id.clone())
431 } else {
432 S3ServiceError::NoSuchVersion {
433 key: key.clone(),
434 version_id: version_id.clone(),
435 }
436 .into_s3_error()
437 }
438 })?
439 } else {
440 store
441 .get(&key)
442 .ok_or_else(|| S3ServiceError::NoSuchKey { key: key.clone() }.into_s3_error())?
443 };
444
445 let obj_version_id = if obj.version_id == "null" {
446 None
447 } else {
448 Some(obj.version_id.clone())
449 };
450
451 let content_type = Some(
452 obj.metadata
453 .content_type
454 .clone()
455 .unwrap_or_else(|| "binary/octet-stream".to_owned()),
456 );
457
458 let metadata = if obj.metadata.user_metadata.is_empty() {
459 HashMap::default()
460 } else {
461 obj.metadata.user_metadata.clone()
462 };
463
464 let checksum_enabled = checksum_mode
466 .as_ref()
467 .is_some_and(|m| m.as_str() == "ENABLED");
468 let cksum = if checksum_enabled {
469 obj.checksum.as_ref().map(checksum_to_fields)
470 } else {
471 None
472 };
473 let output = HeadObjectOutput {
474 accept_ranges: Some("bytes".to_owned()),
475 cache_control: override_cache_control.or(obj.metadata.cache_control.clone()),
476 checksum_crc32: cksum.as_ref().and_then(|c| c.crc32.clone()),
477 checksum_crc32c: cksum.as_ref().and_then(|c| c.crc32c.clone()),
478 checksum_crc64nvme: cksum.as_ref().and_then(|c| c.crc64nvme.clone()),
479 checksum_sha1: cksum.as_ref().and_then(|c| c.sha1.clone()),
480 checksum_sha256: cksum.as_ref().and_then(|c| c.sha256.clone()),
481 checksum_type: cksum.as_ref().and_then(|c| c.checksum_type.clone()),
482 content_disposition: override_content_disposition
483 .or(obj.metadata.content_disposition.clone()),
484 content_encoding: override_content_encoding.or(obj.metadata.content_encoding.clone()),
485 content_language: override_content_language.or(obj.metadata.content_language.clone()),
486 content_length: Some(obj.size as i64),
487 content_type: override_content_type.or(content_type),
488 expires: override_expires.map(|dt| dt.to_rfc2822()),
489 e_tag: Some(obj.etag.clone()),
490 last_modified: Some(obj.last_modified),
491 metadata,
492 object_lock_legal_hold_status: obj
493 .metadata
494 .object_lock_legal_hold
495 .filter(|&v| v)
496 .map(|_| ObjectLockLegalHoldStatus::from("ON")),
497 object_lock_mode: obj
498 .metadata
499 .object_lock_mode
500 .as_deref()
501 .map(ObjectLockMode::from),
502 object_lock_retain_until_date: obj.metadata.object_lock_retain_until,
503 parts_count: obj.parts_count.map(|n| n as i32),
504 sse_customer_algorithm: obj.metadata.sse_customer_algorithm.clone(),
505 sse_customer_key_md5: obj.metadata.sse_customer_key_md5.clone(),
506 ssekms_key_id: obj.metadata.sse_kms_key_id.clone(),
507 server_side_encryption: obj
508 .metadata
509 .sse_algorithm
510 .as_deref()
511 .map(ServerSideEncryption::from),
512 storage_class: Some(StorageClass::from(obj.storage_class.as_str())),
513 version_id: obj_version_id,
514 ..HeadObjectOutput::default()
515 };
516 Ok(output)
517 }
518
519 pub async fn handle_delete_object(
521 &self,
522 input: DeleteObjectInput,
523 ) -> Result<DeleteObjectOutput, S3Error> {
524 let bucket_name = input.bucket;
525 let key = input.key;
526
527 let bucket = self
528 .state
529 .get_bucket(&bucket_name)
530 .map_err(S3ServiceError::into_s3_error)?;
531
532 let (delete_marker_version_id, version_id_to_remove) =
533 if let Some(version_id) = &input.version_id {
534 let bypass = input.bypass_governance_retention.unwrap_or(false);
536 let mut store = bucket.objects.write();
537 check_object_lock_for_delete(&store, &key, version_id, bypass)?;
538 let removed = store.delete_version(&key, version_id);
539 if let Some(ref version) = removed {
540 self.storage
541 .delete_object(&bucket_name, &key, version.version_id());
542 }
543 let is_dm = removed
544 .as_ref()
545 .is_some_and(crate::state::object::ObjectVersion::is_delete_marker);
546 (is_dm, removed.map(|v| v.version_id().to_owned()))
547 } else {
548 let mut store = bucket.objects.write();
550 let (dm_id, _had) = store.delete_versioned(&key, &InternalOwner::default());
551 if dm_id.is_none() {
552 self.storage.delete_object(&bucket_name, &key, "null");
554 }
555 (dm_id.is_some(), dm_id)
556 };
557
558 debug!(bucket = %bucket_name, key = %key, "delete_object completed");
559
560 Ok(DeleteObjectOutput {
561 delete_marker: if delete_marker_version_id {
562 Some(true)
563 } else {
564 None
565 },
566 request_charged: None,
567 version_id: version_id_to_remove,
568 })
569 }
570
571 pub async fn handle_delete_objects(
573 &self,
574 input: DeleteObjectsInput,
575 ) -> Result<DeleteObjectsOutput, S3Error> {
576 let bucket_name = input.bucket;
577
578 let bucket = self
579 .state
580 .get_bucket(&bucket_name)
581 .map_err(S3ServiceError::into_s3_error)?;
582
583 let bypass = input.bypass_governance_retention.unwrap_or(false);
584 let delete_request = input.delete;
585
586 let objects = delete_request.objects;
587 let quiet = delete_request.quiet.unwrap_or(false);
588
589 let mut deleted: Vec<DeletedObject> = Vec::with_capacity(objects.len());
590 let mut errors: Vec<rustack_s3_model::types::Error> = Vec::new();
591
592 for obj_id in objects {
593 let key = obj_id.key;
594 let version_id = obj_id.version_id;
595
596 if let Some(ref vid) = version_id {
597 let mut store = bucket.objects.write();
599 if let Err(lock_err) = check_object_lock_for_delete(&store, &key, vid, bypass) {
600 errors.push(rustack_s3_model::types::Error {
601 code: Some(lock_err.code.as_str().to_owned()),
602 key: Some(key),
603 message: Some(lock_err.message),
604 version_id: Some(vid.clone()),
605 });
606 continue;
607 }
608 let removed = store.delete_version(&key, vid);
609 if let Some(ref version) = removed {
610 self.storage
611 .delete_object(&bucket_name, &key, version.version_id());
612 }
613 let is_dm = removed
614 .as_ref()
615 .is_some_and(crate::state::object::ObjectVersion::is_delete_marker);
616 deleted.push(DeletedObject {
617 delete_marker: if is_dm { Some(true) } else { None },
618 delete_marker_version_id: if is_dm { Some(vid.clone()) } else { None },
619 key: Some(key),
620 version_id: Some(vid.clone()),
621 });
622 } else {
623 let mut store = bucket.objects.write();
625 let (dm_id, _had) = store.delete_versioned(&key, &InternalOwner::default());
626 if dm_id.is_none() {
627 self.storage.delete_object(&bucket_name, &key, "null");
628 }
629 deleted.push(DeletedObject {
630 delete_marker: dm_id.as_ref().map(|_| true),
631 delete_marker_version_id: dm_id.clone(),
632 key: Some(key),
633 version_id: dm_id,
634 });
635 }
636 }
637
638 debug!(
639 bucket = %bucket_name,
640 deleted_count = deleted.len(),
641 error_count = errors.len(),
642 "delete_objects completed"
643 );
644
645 Ok(DeleteObjectsOutput {
646 deleted: if quiet { Vec::new() } else { deleted },
647 errors,
648 request_charged: None,
649 })
650 }
651
652 #[allow(clippy::too_many_lines)]
654 pub async fn handle_copy_object(
655 &self,
656 input: CopyObjectInput,
657 ) -> Result<CopyObjectOutput, S3Error> {
658 let dst_bucket = input.bucket.clone();
659 let dst_key = input.key.clone();
660
661 validate_object_key(&dst_key).map_err(S3ServiceError::into_s3_error)?;
662
663 let (src_bucket, src_key, src_version_id) =
664 parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
665
666 let (src_metadata, src_checksum, src_version_for_storage) = {
669 let src_bucket_ref = self
670 .state
671 .get_bucket(&src_bucket)
672 .map_err(S3ServiceError::into_s3_error)?;
673
674 let src_store = src_bucket_ref.objects.read();
675 let src_obj = if let Some(ref vid) = src_version_id {
676 src_store.get_version(&src_key, vid).ok_or_else(|| {
677 S3ServiceError::NoSuchVersion {
678 key: src_key.clone(),
679 version_id: vid.clone(),
680 }
681 .into_s3_error()
682 })?
683 } else {
684 src_store.get(&src_key).ok_or_else(|| {
685 S3ServiceError::NoSuchKey {
686 key: src_key.clone(),
687 }
688 .into_s3_error()
689 })?
690 };
691
692 (
693 src_obj.metadata.clone(),
694 src_obj.checksum.clone(),
695 src_obj.version_id.clone(),
696 )
697 };
698
699 let dst_bucket_ref = self
701 .state
702 .get_bucket(&dst_bucket)
703 .map_err(S3ServiceError::into_s3_error)?;
704
705 let dst_version_id = if dst_bucket_ref.is_versioning_enabled() {
706 crate::utils::generate_version_id()
707 } else {
708 "null".to_owned()
709 };
710
711 drop(dst_bucket_ref);
713
714 let write_result = self
716 .storage
717 .copy_object(
718 &src_bucket,
719 &src_key,
720 &src_version_for_storage,
721 &dst_bucket,
722 &dst_key,
723 &dst_version_id,
724 )
725 .await
726 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
727
728 let metadata = if input
730 .metadata_directive
731 .as_ref()
732 .is_some_and(|d| *d == MetadataDirective::Replace)
733 {
734 build_metadata_for_copy(&input)
735 } else {
736 src_metadata
737 };
738
739 let storage_class = input
740 .storage_class
741 .as_ref()
742 .map_or_else(|| "STANDARD".to_owned(), StorageClass::as_str_owned);
743
744 let now = Utc::now();
745 let dst_obj = S3Object {
746 key: dst_key.clone(),
747 version_id: dst_version_id.clone(),
748 etag: write_result.etag.clone(),
749 size: write_result.size,
750 last_modified: now,
751 storage_class,
752 metadata,
753 owner: InternalOwner::default(),
754 checksum: src_checksum,
755 parts_count: None,
756 part_etags: Vec::new(),
757 };
758
759 let dst_bucket_ref = self
761 .state
762 .get_bucket(&dst_bucket)
763 .map_err(S3ServiceError::into_s3_error)?;
764 {
765 let mut store = dst_bucket_ref.objects.write();
766 store.put(dst_obj);
767 }
768
769 debug!(
770 src_bucket = %src_bucket,
771 src_key = %src_key,
772 dst_bucket = %dst_bucket,
773 dst_key = %dst_key,
774 "copy_object completed"
775 );
776
777 let real_version_id = if dst_version_id == "null" {
778 None
779 } else {
780 Some(dst_version_id)
781 };
782
783 let copy_result = CopyObjectResult {
784 e_tag: Some(write_result.etag),
785 last_modified: Some(now),
786 ..CopyObjectResult::default()
787 };
788
789 Ok(CopyObjectOutput {
790 copy_object_result: Some(copy_result),
791 copy_source_version_id: src_version_id,
792 version_id: real_version_id,
793 ..CopyObjectOutput::default()
794 })
795 }
796}
797
798trait AsStrOwned {
807 fn as_str_owned(&self) -> String;
809}
810
811impl AsStrOwned for StorageClass {
812 fn as_str_owned(&self) -> String {
813 self.as_str().to_owned()
814 }
815}
816
817fn build_metadata(input: &PutObjectInput) -> ObjectMetadata {
819 let user_metadata = input.metadata.clone();
820
821 let tagging = input
823 .tagging
824 .as_deref()
825 .map(parse_tagging_header)
826 .unwrap_or_default();
827
828 let acl = parse_acl(input.acl.as_ref());
829
830 ObjectMetadata {
831 content_type: input.content_type.clone(),
832 content_encoding: input.content_encoding.clone(),
833 content_disposition: input.content_disposition.clone(),
834 content_language: input.content_language.clone(),
835 cache_control: input.cache_control.clone(),
836 expires: input.expires.clone(),
837 user_metadata,
838 sse_algorithm: input
839 .server_side_encryption
840 .as_ref()
841 .map(|sse: &ServerSideEncryption| sse.as_str().to_owned()),
842 sse_kms_key_id: input.ssekms_key_id.clone(),
843 sse_bucket_key_enabled: input.bucket_key_enabled,
844 sse_customer_algorithm: input.sse_customer_algorithm.clone(),
845 sse_customer_key_md5: input.sse_customer_key_md5.clone(),
846 tagging,
847 acl,
848 object_lock_mode: input
849 .object_lock_mode
850 .as_ref()
851 .map(|m: &ObjectLockMode| m.as_str().to_owned()),
852 object_lock_retain_until: input.object_lock_retain_until_date,
853 object_lock_legal_hold: input
854 .object_lock_legal_hold_status
855 .as_ref()
856 .map(|s: &ObjectLockLegalHoldStatus| s.as_str() == "ON"),
857 }
858}
859
860fn build_metadata_for_copy(input: &CopyObjectInput) -> ObjectMetadata {
862 let user_metadata = input.metadata.clone();
863
864 let tagging = input
865 .tagging
866 .as_deref()
867 .map(parse_tagging_header)
868 .unwrap_or_default();
869
870 let acl = parse_acl(input.acl.as_ref());
871
872 ObjectMetadata {
873 content_type: input.content_type.clone(),
874 content_encoding: input.content_encoding.clone(),
875 content_disposition: input.content_disposition.clone(),
876 content_language: input.content_language.clone(),
877 cache_control: input.cache_control.clone(),
878 expires: None,
879 user_metadata,
880 sse_algorithm: input
881 .server_side_encryption
882 .as_ref()
883 .map(|sse: &ServerSideEncryption| sse.as_str().to_owned()),
884 sse_kms_key_id: input.ssekms_key_id.clone(),
885 sse_bucket_key_enabled: input.bucket_key_enabled,
886 sse_customer_algorithm: input.sse_customer_algorithm.clone(),
887 sse_customer_key_md5: input.sse_customer_key_md5.clone(),
888 tagging,
889 acl,
890 object_lock_mode: input
891 .object_lock_mode
892 .as_ref()
893 .map(|m: &ObjectLockMode| m.as_str().to_owned()),
894 object_lock_retain_until: input.object_lock_retain_until_date,
895 object_lock_legal_hold: input
896 .object_lock_legal_hold_status
897 .as_ref()
898 .map(|s: &ObjectLockLegalHoldStatus| s.as_str() == "ON"),
899 }
900}
901
902fn parse_acl(acl: Option<&ObjectCannedACL>) -> CannedAcl {
904 acl.and_then(|a| a.as_str().parse::<CannedAcl>().ok())
905 .unwrap_or_default()
906}
907
908pub(super) fn parse_tagging_header(tagging: &str) -> Vec<(String, String)> {
910 tagging
911 .split('&')
912 .filter(|s| !s.is_empty())
913 .filter_map(|pair| {
914 let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
915 let key = percent_encoding::percent_decode_str(k)
916 .decode_utf8()
917 .ok()?
918 .to_string();
919 let value = percent_encoding::percent_decode_str(v)
920 .decode_utf8()
921 .ok()?
922 .to_string();
923 Some((key, value))
924 })
925 .collect()
926}
927
928struct ChecksumFields {
930 crc32: Option<String>,
931 crc32c: Option<String>,
932 crc64nvme: Option<String>,
933 sha1: Option<String>,
934 sha256: Option<String>,
935 checksum_type: Option<ChecksumType>,
936}
937
938fn checksum_to_fields(checksum: &ChecksumData) -> ChecksumFields {
940 let mut fields = ChecksumFields {
941 crc32: None,
942 crc32c: None,
943 crc64nvme: None,
944 sha1: None,
945 sha256: None,
946 checksum_type: None,
947 };
948 match checksum.algorithm.as_str() {
949 "CRC32" => fields.crc32 = Some(checksum.value.clone()),
950 "CRC32C" => fields.crc32c = Some(checksum.value.clone()),
951 "CRC64NVME" => fields.crc64nvme = Some(checksum.value.clone()),
952 "SHA1" => fields.sha1 = Some(checksum.value.clone()),
953 "SHA256" => fields.sha256 = Some(checksum.value.clone()),
954 _ => {}
955 }
956 fields.checksum_type = Some(match checksum.checksum_type.as_str() {
957 "COMPOSITE" => ChecksumType::Composite,
958 _ => ChecksumType::FullObject,
959 });
960 fields
961}
962
963fn extract_checksum_from_put(
969 input: &PutObjectInput,
970) -> Result<Option<ChecksumData>, S3ServiceError> {
971 let candidates: [(&str, &Option<String>); 5] = [
972 ("CRC32", &input.checksum_crc32),
973 ("CRC32C", &input.checksum_crc32c),
974 ("CRC64NVME", &input.checksum_crc64nvme),
975 ("SHA1", &input.checksum_sha1),
976 ("SHA256", &input.checksum_sha256),
977 ];
978 let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
979 if found.len() > 1 {
980 return Err(S3ServiceError::InvalidArgument {
981 message: "Only one checksum value can be provided per request".to_owned(),
982 });
983 }
984 Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
985 algorithm: (*alg).to_owned(),
986 value: val.as_ref().unwrap_or(&String::new()).clone(),
987 checksum_type: "FULL_OBJECT".to_owned(),
988 }))
989}
990
991#[cfg(test)]
992mod tests {
993 use super::*;
994
995 #[test]
996 fn test_should_parse_copy_source_simple() {
997 let (bucket, key, vid) = parse_copy_source("my-bucket/my-key").unwrap();
998 assert_eq!(bucket, "my-bucket");
999 assert_eq!(key, "my-key");
1000 assert!(vid.is_none());
1001 }
1002
1003 #[test]
1004 fn test_should_parse_copy_source_with_leading_slash() {
1005 let (bucket, key, vid) = parse_copy_source("/my-bucket/my-key").unwrap();
1006 assert_eq!(bucket, "my-bucket");
1007 assert_eq!(key, "my-key");
1008 assert!(vid.is_none());
1009 }
1010
1011 #[test]
1012 fn test_should_parse_copy_source_with_version_id() {
1013 let (bucket, key, vid) = parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
1014 assert_eq!(bucket, "my-bucket");
1015 assert_eq!(key, "my-key");
1016 assert_eq!(vid.as_deref(), Some("abc123"));
1017 }
1018
1019 #[test]
1020 fn test_should_parse_copy_source_with_nested_key() {
1021 let (bucket, key, vid) = parse_copy_source("bucket/path/to/key").unwrap();
1022 assert_eq!(bucket, "bucket");
1023 assert_eq!(key, "path/to/key");
1024 assert!(vid.is_none());
1025 }
1026
1027 #[test]
1028 fn test_should_parse_copy_source_with_encoded_key() {
1029 let (bucket, key, vid) = parse_copy_source("bucket/path%20to/key%2B1").unwrap();
1030 assert_eq!(bucket, "bucket");
1031 assert_eq!(key, "path to/key+1");
1032 assert!(vid.is_none());
1033 }
1034
1035 #[test]
1036 fn test_should_reject_copy_source_no_key() {
1037 assert!(parse_copy_source("bucket-only").is_err());
1038 }
1039
1040 #[test]
1041 fn test_should_reject_copy_source_empty_bucket() {
1042 assert!(parse_copy_source("/").is_err());
1043 }
1044
1045 #[test]
1046 fn test_should_reject_copy_source_empty_key() {
1047 assert!(parse_copy_source("bucket/").is_err());
1048 }
1049
1050 #[test]
1051 fn test_should_parse_tagging_header_basic() {
1052 let tags = parse_tagging_header("key1=value1&key2=value2");
1053 assert_eq!(tags.len(), 2);
1054 assert_eq!(tags[0], ("key1".to_owned(), "value1".to_owned()));
1055 assert_eq!(tags[1], ("key2".to_owned(), "value2".to_owned()));
1056 }
1057
1058 #[test]
1059 fn test_should_parse_tagging_header_encoded() {
1060 let tags = parse_tagging_header("key%201=value%201");
1061 assert_eq!(tags.len(), 1);
1062 assert_eq!(tags[0], ("key 1".to_owned(), "value 1".to_owned()));
1063 }
1064
1065 #[test]
1066 fn test_should_parse_tagging_header_empty() {
1067 let tags = parse_tagging_header("");
1068 assert!(tags.is_empty());
1069 }
1070
1071 #[test]
1072 fn test_should_parse_tagging_header_no_value() {
1073 let tags = parse_tagging_header("key1");
1074 assert_eq!(tags.len(), 1);
1075 assert_eq!(tags[0], ("key1".to_owned(), String::new()));
1076 }
1077}