1use std::str::FromStr;
8
9use chrono::Utc;
10use rustack_s3_model::{
11 error::{S3Error, S3ErrorCode},
12 input::{
13 AbortMultipartUploadInput, CompleteMultipartUploadInput, CreateMultipartUploadInput,
14 ListMultipartUploadsInput, ListPartsInput, UploadPartCopyInput, UploadPartInput,
15 },
16 output::{
17 AbortMultipartUploadOutput, CompleteMultipartUploadOutput, CreateMultipartUploadOutput,
18 ListMultipartUploadsOutput, ListPartsOutput, UploadPartCopyOutput, UploadPartOutput,
19 },
20 types::{
21 ChecksumAlgorithm, ChecksumType, CopyPartResult, Initiator,
22 MultipartUpload as ModelMultipartUpload, Part, StorageClass,
23 },
24};
25use tracing::debug;
26
27use crate::{
28 checksums::{
29 ChecksumAlgorithm as CoreChecksumAlgorithm, compute_checksum, compute_composite_checksum,
30 },
31 error::S3ServiceError,
32 provider::RustackS3,
33 state::{
34 multipart::{MultipartUpload, UploadPart},
35 object::{ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
36 },
37 utils::{generate_upload_id, parse_copy_source},
38 validation::{validate_content_md5, validate_object_key},
39};
40
41const MIN_PART_SIZE: u64 = 5 * 1024 * 1024;
44
45use super::bucket::to_model_owner;
46
47#[allow(
50 clippy::cast_possible_wrap,
51 clippy::cast_possible_truncation,
52 clippy::cast_sign_loss,
53 clippy::unused_async
54)]
55impl RustackS3 {
56 #[allow(clippy::too_many_lines)]
58 pub async fn handle_create_multipart_upload(
59 &self,
60 input: CreateMultipartUploadInput,
61 ) -> Result<CreateMultipartUploadOutput, S3Error> {
62 let bucket_name = input.bucket;
63 let key = input.key;
64
65 validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
66
67 let bucket = self
68 .state
69 .get_bucket(&bucket_name)
70 .map_err(S3ServiceError::into_s3_error)?;
71
72 let upload_id = generate_upload_id();
73
74 let metadata = ObjectMetadata {
76 content_type: input.content_type.clone(),
77 content_encoding: input.content_encoding.clone(),
78 content_disposition: input.content_disposition.clone(),
79 content_language: input.content_language.clone(),
80 cache_control: input.cache_control.clone(),
81 expires: None,
82 user_metadata: input.metadata.clone(),
83 sse_algorithm: input
84 .server_side_encryption
85 .as_ref()
86 .map(|s| s.as_str().to_owned()),
87 sse_kms_key_id: input.ssekms_key_id.clone(),
88 sse_bucket_key_enabled: input.bucket_key_enabled,
89 sse_customer_algorithm: input.sse_customer_algorithm.clone(),
90 sse_customer_key_md5: input.sse_customer_key_md5.clone(),
91 tagging: input
92 .tagging
93 .as_ref()
94 .map(|t| super::object::parse_tagging_header(t))
95 .unwrap_or_default(),
96 acl: input
97 .acl
98 .as_ref()
99 .and_then(|a| a.as_str().parse().ok())
100 .unwrap_or_default(),
101 object_lock_mode: input
102 .object_lock_mode
103 .as_ref()
104 .map(|m| m.as_str().to_owned()),
105 object_lock_retain_until: input.object_lock_retain_until_date,
106 object_lock_legal_hold: input
107 .object_lock_legal_hold_status
108 .as_ref()
109 .map(|s| s.as_str() == "ON"),
110 };
111
112 let mut upload = MultipartUpload::new(
113 upload_id.clone(),
114 key.clone(),
115 InternalOwner::default(),
116 metadata,
117 );
118
119 upload.storage_class = input
120 .storage_class
121 .as_ref()
122 .map_or_else(|| "STANDARD".to_owned(), |s| s.as_str().to_owned());
123
124 upload.checksum_algorithm = input
125 .checksum_algorithm
126 .as_ref()
127 .map(|a| a.as_str().to_owned());
128
129 if let Some(ref algo_str) = upload.checksum_algorithm {
132 let requested_type = input
133 .checksum_type
134 .as_ref()
135 .map(|ct| ct.as_str().to_owned());
136 upload.checksum_type = Some(match algo_str.as_str() {
137 "CRC64NVME" => "FULL_OBJECT".to_owned(),
138 "SHA1" | "SHA256" => "COMPOSITE".to_owned(),
139 _ => requested_type.unwrap_or_else(|| "COMPOSITE".to_owned()),
140 });
141 }
142
143 upload.sse_algorithm = input
144 .server_side_encryption
145 .as_ref()
146 .map(|s| s.as_str().to_owned());
147
148 upload.sse_kms_key_id.clone_from(&input.ssekms_key_id);
149
150 let output_checksum_type = upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
151 "FULL_OBJECT" => ChecksumType::FullObject,
152 _ => ChecksumType::Composite,
153 });
154
155 bucket.multipart_uploads.insert(upload_id.clone(), upload);
156
157 debug!(
158 bucket = %bucket_name,
159 key = %key,
160 upload_id = %upload_id,
161 "create_multipart_upload completed"
162 );
163
164 Ok(CreateMultipartUploadOutput {
165 abort_date: None,
166 abort_rule_id: None,
167 bucket: Some(bucket_name),
168 bucket_key_enabled: None,
169 checksum_algorithm: input.checksum_algorithm,
170 checksum_type: output_checksum_type,
171 key: Some(key),
172 request_charged: None,
173 sse_customer_algorithm: None,
174 sse_customer_key_md5: None,
175 ssekms_encryption_context: None,
176 ssekms_key_id: None,
177 server_side_encryption: None,
178 upload_id: Some(upload_id),
179 })
180 }
181
182 #[allow(clippy::too_many_lines)]
184 pub async fn handle_upload_part(
185 &self,
186 mut input: UploadPartInput,
187 ) -> Result<UploadPartOutput, S3Error> {
188 let part_checksum = extract_checksum_from_part(&input)?;
190
191 let bucket_name = input.bucket;
192 let key = input.key;
193 let upload_id = input.upload_id;
194 let part_number = input.part_number;
195
196 if !(1..=10_000).contains(&part_number) {
197 return Err(S3Error::invalid_argument(
198 "Part number must be between 1 and 10000",
199 ));
200 }
201
202 let bucket = self
203 .state
204 .get_bucket(&bucket_name)
205 .map_err(S3ServiceError::into_s3_error)?;
206
207 let upload_checksum_algorithm = {
209 let upload_ref = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
210 S3ServiceError::NoSuchUpload {
211 upload_id: upload_id.clone(),
212 }
213 .into_s3_error()
214 })?;
215 upload_ref.checksum_algorithm.clone()
216 };
217
218 let body_data = input.body.take().map(|b| b.data).unwrap_or_default();
220
221 validate_content_md5(input.content_md5.as_deref(), &body_data)
223 .map_err(S3ServiceError::into_s3_error)?;
224
225 let checksum = if let Some(ref algo_str) = upload_checksum_algorithm {
228 if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
229 let computed = compute_checksum(algo, &body_data);
230 if let Some(ref client_cksum) = part_checksum {
231 if !client_cksum.algorithm.eq_ignore_ascii_case(algo_str) {
233 return Err(S3ServiceError::InvalidArgument {
234 message: format!(
235 "Checksum algorithm mismatch: expected {algo_str}, got {}",
236 client_cksum.algorithm
237 ),
238 }
239 .into_s3_error());
240 }
241 if client_cksum.value != computed {
243 return Err(S3ServiceError::BadDigest.into_s3_error());
244 }
245 }
246 Some(ChecksumData {
247 algorithm: algo_str.clone(),
248 value: computed,
249 checksum_type: "FULL_OBJECT".to_owned(),
250 })
251 } else {
252 part_checksum
253 }
254 } else {
255 part_checksum
256 };
257
258 let write_result = self
260 .storage
261 .write_part(&bucket_name, &upload_id, part_number as u32, body_data)
262 .await
263 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
264
265 let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
267 checksum_to_output_fields(checksum.as_ref());
268
269 let part = UploadPart {
271 part_number: part_number as u32,
272 etag: write_result.etag.clone(),
273 size: write_result.size,
274 last_modified: Utc::now(),
275 checksum,
276 };
277
278 if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
279 upload.put_part(part);
280 }
281
282 debug!(
283 bucket = %bucket_name,
284 key = %key,
285 upload_id = %upload_id,
286 part_number,
287 "upload_part completed"
288 );
289
290 Ok(UploadPartOutput {
291 bucket_key_enabled: None,
292 checksum_crc32: out_crc32,
293 checksum_crc32c: out_crc32c,
294 checksum_crc64nvme: out_crc64nvme,
295 checksum_sha1: out_sha1,
296 checksum_sha256: out_sha256,
297 e_tag: Some(write_result.etag),
298 request_charged: None,
299 sse_customer_algorithm: None,
300 sse_customer_key_md5: None,
301 ssekms_key_id: None,
302 server_side_encryption: None,
303 })
304 }
305
306 pub async fn handle_upload_part_copy(
308 &self,
309 input: UploadPartCopyInput,
310 ) -> Result<UploadPartCopyOutput, S3Error> {
311 let bucket_name = input.bucket;
312 let upload_id = input.upload_id;
313 let part_number = input.part_number;
314
315 let (src_bucket, src_key, src_version_id) =
316 parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
317
318 let src_vid = src_version_id.as_deref().unwrap_or("null");
320 let data = self
321 .storage
322 .read_object(&src_bucket, &src_key, src_vid, None)
323 .await
324 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
325
326 let write_result = self
328 .storage
329 .write_part(&bucket_name, &upload_id, part_number as u32, data)
330 .await
331 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
332
333 let bucket = self
335 .state
336 .get_bucket(&bucket_name)
337 .map_err(S3ServiceError::into_s3_error)?;
338
339 let part = UploadPart {
340 part_number: part_number as u32,
341 etag: write_result.etag.clone(),
342 size: write_result.size,
343 last_modified: Utc::now(),
344 checksum: None,
345 };
346
347 if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
348 upload.put_part(part);
349 }
350
351 let copy_result = CopyPartResult {
352 checksum_crc32: None,
353 checksum_crc32c: None,
354 checksum_crc64nvme: None,
355 checksum_sha1: None,
356 checksum_sha256: None,
357 e_tag: Some(write_result.etag),
358 last_modified: Some(Utc::now()),
359 };
360
361 Ok(UploadPartCopyOutput {
362 bucket_key_enabled: None,
363 copy_part_result: Some(copy_result),
364 copy_source_version_id: src_version_id,
365 request_charged: None,
366 sse_customer_algorithm: None,
367 sse_customer_key_md5: None,
368 ssekms_key_id: None,
369 server_side_encryption: None,
370 })
371 }
372
373 #[allow(clippy::too_many_lines)]
375 pub async fn handle_complete_multipart_upload(
376 &self,
377 input: CompleteMultipartUploadInput,
378 ) -> Result<CompleteMultipartUploadOutput, S3Error> {
379 let bucket_name = input.bucket;
380 let key = input.key;
381 let upload_id = input.upload_id;
382
383 let bucket = self
384 .state
385 .get_bucket(&bucket_name)
386 .map_err(S3ServiceError::into_s3_error)?;
387
388 let upload = bucket
390 .multipart_uploads
391 .get(&upload_id)
392 .ok_or_else(|| {
393 S3ServiceError::NoSuchUpload {
394 upload_id: upload_id.clone(),
395 }
396 .into_s3_error()
397 })?
398 .clone();
399
400 let requested_parts = input
402 .multipart_upload
403 .map(|mu| mu.parts)
404 .unwrap_or_default();
405
406 let mut part_numbers: Vec<u32> = Vec::with_capacity(requested_parts.len());
408 let mut last_num = 0i32;
409
410 for cp in &requested_parts {
411 let part_num = cp.part_number.ok_or_else(|| {
412 S3Error::with_message(S3ErrorCode::InvalidArgument, "Part number is required")
413 })?;
414
415 if part_num <= last_num {
416 return Err(S3ServiceError::InvalidPartOrder.into_s3_error());
417 }
418 last_num = part_num;
419
420 let part_num_u32 = u32::try_from(part_num).map_err(|_| {
421 S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid part number")
422 })?;
423
424 upload
426 .get_part(part_num_u32)
427 .ok_or_else(|| S3ServiceError::InvalidPart.into_s3_error())?;
428
429 part_numbers.push(part_num_u32);
430 }
431
432 if part_numbers.len() > 1 {
435 for &num in &part_numbers[..part_numbers.len() - 1] {
436 if let Some(part) = upload.get_part(num) {
437 if part.size < MIN_PART_SIZE {
438 return Err(S3Error::with_message(
439 S3ErrorCode::EntityTooSmall,
440 format!(
441 "Your proposed upload is smaller than the minimum allowed size. \
442 Part {num} has size {} bytes, minimum is {MIN_PART_SIZE}",
443 part.size
444 ),
445 ));
446 }
447 }
448 }
449 }
450
451 let version_id = if bucket.is_versioning_enabled() {
453 crate::utils::generate_version_id()
454 } else {
455 "null".to_owned()
456 };
457
458 let (write_result, _part_md5s) = self
460 .storage
461 .complete_multipart(&bucket_name, &upload_id, &key, &version_id, &part_numbers)
462 .await
463 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
464
465 let final_checksum = if let Some(ref algo_str) = upload.checksum_algorithm {
475 if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
476 let checksum_type_str = upload.checksum_type.as_deref().unwrap_or("COMPOSITE");
477
478 let value = if checksum_type_str == "FULL_OBJECT" {
479 let assembled = self
480 .storage
481 .read_object(&bucket_name, &key, &version_id, None)
482 .await
483 .map_err(|e| {
484 S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error()
485 })?;
486 compute_checksum(algo, &assembled)
487 } else {
488 let part_checksums: Vec<String> = part_numbers
490 .iter()
491 .filter_map(|&num| {
492 upload
493 .get_part(num)
494 .and_then(|p| p.checksum.as_ref())
495 .map(|c| c.value.clone())
496 })
497 .collect();
498 compute_composite_checksum(algo, &part_checksums)
499 };
500
501 Some(ChecksumData {
502 algorithm: algo_str.clone(),
503 value,
504 checksum_type: checksum_type_str.to_owned(),
505 })
506 } else {
507 None
508 }
509 } else {
510 None
511 };
512
513 let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
514 checksum_to_output_fields(final_checksum.as_ref());
515
516 let out_checksum_type = final_checksum
517 .as_ref()
518 .map(|c| match c.checksum_type.as_str() {
519 "FULL_OBJECT" => ChecksumType::FullObject,
520 _ => ChecksumType::Composite,
521 });
522
523 let obj = S3Object {
525 key: key.clone(),
526 version_id: version_id.clone(),
527 etag: write_result.etag.clone(),
528 size: write_result.size,
529 last_modified: Utc::now(),
530 storage_class: upload.storage_class.clone(),
531 metadata: upload.metadata.clone(),
532 owner: upload.owner.clone(),
533 checksum: final_checksum,
534 parts_count: Some(part_numbers.len() as u32),
535 part_etags: requested_parts
536 .iter()
537 .filter_map(|p| p.e_tag.clone())
538 .collect(),
539 };
540
541 {
542 let mut store = bucket.objects.write();
543 store.put(obj);
544 }
545
546 bucket.multipart_uploads.remove(&upload_id);
548
549 debug!(
550 bucket = %bucket_name,
551 key = %key,
552 upload_id = %upload_id,
553 parts = part_numbers.len(),
554 "complete_multipart_upload completed"
555 );
556
557 let real_version_id = if version_id == "null" {
558 None
559 } else {
560 Some(version_id)
561 };
562
563 Ok(CompleteMultipartUploadOutput {
564 bucket: Some(bucket_name.clone()),
565 bucket_key_enabled: None,
566 checksum_crc32: out_crc32,
567 checksum_crc32c: out_crc32c,
568 checksum_crc64nvme: out_crc64nvme,
569 checksum_sha1: out_sha1,
570 checksum_sha256: out_sha256,
571 checksum_type: out_checksum_type,
572 e_tag: Some(write_result.etag),
573 expiration: None,
574 key: Some(key),
575 location: Some(format!("http://s3.amazonaws.com/{bucket_name}")),
576 request_charged: None,
577 ssekms_key_id: None,
578 server_side_encryption: None,
579 version_id: real_version_id,
580 })
581 }
582
583 pub async fn handle_abort_multipart_upload(
585 &self,
586 input: AbortMultipartUploadInput,
587 ) -> Result<AbortMultipartUploadOutput, S3Error> {
588 let bucket_name = input.bucket;
589 let upload_id = input.upload_id;
590
591 let bucket = self
592 .state
593 .get_bucket(&bucket_name)
594 .map_err(S3ServiceError::into_s3_error)?;
595
596 bucket.multipart_uploads.remove(&upload_id);
598
599 self.storage.abort_multipart(&bucket_name, &upload_id);
601
602 debug!(
603 bucket = %bucket_name,
604 upload_id = %upload_id,
605 "abort_multipart_upload completed"
606 );
607
608 Ok(AbortMultipartUploadOutput {
609 request_charged: None,
610 })
611 }
612
613 pub async fn handle_list_parts(
615 &self,
616 input: ListPartsInput,
617 ) -> Result<ListPartsOutput, S3Error> {
618 let bucket_name = input.bucket;
619 let key = input.key;
620 let upload_id = input.upload_id;
621
622 let bucket = self
623 .state
624 .get_bucket(&bucket_name)
625 .map_err(S3ServiceError::into_s3_error)?;
626
627 let upload = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
628 S3ServiceError::NoSuchUpload {
629 upload_id: upload_id.clone(),
630 }
631 .into_s3_error()
632 })?;
633
634 let max_parts = input.max_parts.unwrap_or(1000) as usize;
635 let part_number_marker: u32 = input
636 .part_number_marker
637 .as_deref()
638 .and_then(|s| s.parse().ok())
639 .unwrap_or(0);
640
641 let all_parts: Vec<&UploadPart> = upload
642 .parts
643 .values()
644 .filter(|p| p.part_number > part_number_marker)
645 .collect();
646
647 let is_truncated = all_parts.len() > max_parts;
648 let parts_to_return = &all_parts[..all_parts.len().min(max_parts)];
649
650 let s3_parts: Vec<Part> = parts_to_return
651 .iter()
652 .map(|p| {
653 let (crc32, crc32c, crc64nvme, sha1, sha256) =
654 checksum_to_output_fields(p.checksum.as_ref());
655 Part {
656 checksum_crc32: crc32,
657 checksum_crc32c: crc32c,
658 checksum_crc64nvme: crc64nvme,
659 checksum_sha1: sha1,
660 checksum_sha256: sha256,
661 e_tag: Some(p.etag.clone()),
662 last_modified: Some(p.last_modified),
663 part_number: Some(p.part_number as i32),
664 size: Some(p.size as i64),
665 }
666 })
667 .collect();
668
669 let next_marker = if is_truncated {
670 s3_parts.last().and_then(|p| p.part_number)
671 } else {
672 None
673 };
674
675 let owner = to_model_owner(&upload.owner);
676
677 Ok(ListPartsOutput {
678 abort_date: None,
679 abort_rule_id: None,
680 bucket: Some(bucket_name),
681 checksum_algorithm: upload
682 .checksum_algorithm
683 .as_ref()
684 .map(|a| ChecksumAlgorithm::from(a.as_str())),
685 checksum_type: upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
686 "FULL_OBJECT" => ChecksumType::FullObject,
687 _ => ChecksumType::Composite,
688 }),
689 initiator: Some(Initiator {
690 display_name: Some(upload.owner.display_name.clone()),
691 id: Some(upload.owner.id.clone()),
692 }),
693 is_truncated: Some(is_truncated),
694 key: Some(key),
695 max_parts: Some(max_parts as i32),
696 next_part_number_marker: next_marker.map(|n| n.to_string()),
697 owner: Some(owner),
698 part_number_marker: Some(part_number_marker.to_string()),
699 parts: s3_parts,
700 request_charged: None,
701 storage_class: Some(StorageClass::from(upload.storage_class.as_str())),
702 upload_id: Some(upload_id),
703 })
704 }
705
706 pub async fn handle_list_multipart_uploads(
708 &self,
709 input: ListMultipartUploadsInput,
710 ) -> Result<ListMultipartUploadsOutput, S3Error> {
711 let bucket_name = input.bucket;
712
713 let bucket = self
714 .state
715 .get_bucket(&bucket_name)
716 .map_err(S3ServiceError::into_s3_error)?;
717
718 let prefix = input.prefix.unwrap_or_default();
719 let max_uploads = input.max_uploads.unwrap_or(1000) as usize;
720
721 let mut uploads: Vec<MultipartUpload> = bucket
722 .multipart_uploads
723 .iter()
724 .filter(|entry| entry.key.starts_with(&prefix))
725 .map(|entry| entry.value().clone())
726 .collect();
727
728 uploads.sort_by(|a, b| a.key.cmp(&b.key).then(a.initiated.cmp(&b.initiated)));
730
731 let is_truncated = uploads.len() > max_uploads;
732 let uploads_to_return = &uploads[..uploads.len().min(max_uploads)];
733
734 let s3_uploads: Vec<ModelMultipartUpload> = uploads_to_return
735 .iter()
736 .map(|u| ModelMultipartUpload {
737 checksum_algorithm: u
738 .checksum_algorithm
739 .as_ref()
740 .map(|a| ChecksumAlgorithm::from(a.as_str())),
741 checksum_type: None,
742 initiated: Some(u.initiated),
743 initiator: Some(Initiator {
744 display_name: Some(u.owner.display_name.clone()),
745 id: Some(u.owner.id.clone()),
746 }),
747 key: Some(u.key.clone()),
748 owner: Some(to_model_owner(&u.owner)),
749 storage_class: Some(StorageClass::from(u.storage_class.as_str())),
750 upload_id: Some(u.upload_id.clone()),
751 })
752 .collect();
753
754 let next_key_marker = if is_truncated {
755 s3_uploads.last().and_then(|u| u.key.clone())
756 } else {
757 None
758 };
759 let next_upload_id_marker = if is_truncated {
760 s3_uploads.last().and_then(|u| u.upload_id.clone())
761 } else {
762 None
763 };
764
765 Ok(ListMultipartUploadsOutput {
766 bucket: Some(bucket_name),
767 common_prefixes: Vec::new(),
768 delimiter: input.delimiter,
769 encoding_type: None,
770 is_truncated: Some(is_truncated),
771 key_marker: input.key_marker,
772 max_uploads: Some(max_uploads as i32),
773 next_key_marker,
774 next_upload_id_marker,
775 prefix: Some(prefix),
776 request_charged: None,
777 upload_id_marker: input.upload_id_marker,
778 uploads: s3_uploads,
779 })
780 }
781}
782
783type ChecksumOutputFields = (
785 Option<String>,
786 Option<String>,
787 Option<String>,
788 Option<String>,
789 Option<String>,
790);
791
792#[allow(clippy::result_large_err)]
797fn extract_checksum_from_part(input: &UploadPartInput) -> Result<Option<ChecksumData>, S3Error> {
798 let candidates: [(&str, &Option<String>); 5] = [
799 ("CRC32", &input.checksum_crc32),
800 ("CRC32C", &input.checksum_crc32c),
801 ("CRC64NVME", &input.checksum_crc64nvme),
802 ("SHA1", &input.checksum_sha1),
803 ("SHA256", &input.checksum_sha256),
804 ];
805 let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
806 if found.len() > 1 {
807 return Err(S3ServiceError::InvalidArgument {
808 message: "Only one checksum value can be provided per request".to_owned(),
809 }
810 .into_s3_error());
811 }
812 Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
813 algorithm: (*alg).to_owned(),
814 value: val.as_ref().unwrap_or(&String::new()).clone(),
815 checksum_type: "FULL_OBJECT".to_owned(),
816 }))
817}
818
819fn checksum_to_output_fields(checksum: Option<&ChecksumData>) -> ChecksumOutputFields {
822 let Some(c) = checksum else {
823 return (None, None, None, None, None);
824 };
825 match c.algorithm.as_str() {
826 "CRC32" => (Some(c.value.clone()), None, None, None, None),
827 "CRC32C" => (None, Some(c.value.clone()), None, None, None),
828 "CRC64NVME" => (None, None, Some(c.value.clone()), None, None),
829 "SHA1" => (None, None, None, Some(c.value.clone()), None),
830 "SHA256" => (None, None, None, None, Some(c.value.clone())),
831 _ => (None, None, None, None, None),
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838
839 #[test]
840 fn test_should_parse_copy_source_basic() {
841 let (bucket, key, version) = parse_copy_source("/my-bucket/my-key").unwrap();
842 assert_eq!(bucket, "my-bucket");
843 assert_eq!(key, "my-key");
844 assert!(version.is_none());
845 }
846
847 #[test]
848 fn test_should_parse_copy_source_without_leading_slash() {
849 let (bucket, key, version) = parse_copy_source("my-bucket/my-key").unwrap();
850 assert_eq!(bucket, "my-bucket");
851 assert_eq!(key, "my-key");
852 assert!(version.is_none());
853 }
854
855 #[test]
856 fn test_should_parse_copy_source_with_version_id() {
857 let (bucket, key, version) =
858 parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
859 assert_eq!(bucket, "my-bucket");
860 assert_eq!(key, "my-key");
861 assert_eq!(version.as_deref(), Some("abc123"));
862 }
863
864 #[test]
865 fn test_should_parse_copy_source_with_nested_key() {
866 let (bucket, key, version) =
867 parse_copy_source("/my-bucket/path/to/my-key?versionId=v1").unwrap();
868 assert_eq!(bucket, "my-bucket");
869 assert_eq!(key, "path/to/my-key");
870 assert_eq!(version.as_deref(), Some("v1"));
871 }
872
873 #[test]
874 fn test_should_fail_on_invalid_copy_source() {
875 let result = parse_copy_source("no-slash");
876 assert!(result.is_err());
877 }
878
879 #[test]
880 fn test_should_fail_on_empty_bucket() {
881 let result = parse_copy_source("/");
882 assert!(result.is_err());
883 }
884}