1use std::str::FromStr;
8
9use chrono::Utc;
10use rustack_s3_model::{
11 error::{S3Error, S3ErrorCode},
12 input::{
13 AbortMultipartUploadInput, CompleteMultipartUploadInput, CreateMultipartUploadInput,
14 ListMultipartUploadsInput, ListPartsInput, UploadPartCopyInput, UploadPartInput,
15 },
16 output::{
17 AbortMultipartUploadOutput, CompleteMultipartUploadOutput, CreateMultipartUploadOutput,
18 ListMultipartUploadsOutput, ListPartsOutput, UploadPartCopyOutput, UploadPartOutput,
19 },
20 types::{
21 ChecksumAlgorithm, ChecksumType, CopyPartResult, Initiator,
22 MultipartUpload as ModelMultipartUpload, Part, StorageClass,
23 },
24};
25use tracing::debug;
26
27use crate::{
28 checksums::{
29 ChecksumAlgorithm as CoreChecksumAlgorithm, compute_checksum, compute_composite_checksum,
30 },
31 error::S3ServiceError,
32 provider::RustackS3,
33 state::{
34 multipart::{MultipartUpload, UploadPart},
35 object::{ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
36 },
37 utils::{generate_upload_id, parse_copy_source},
38 validation::{validate_content_md5, validate_object_key},
39};
40
41const MIN_PART_SIZE: u64 = 5 * 1024 * 1024;
44
45use super::bucket::to_model_owner;
46
47#[allow(
50 clippy::cast_possible_wrap,
51 clippy::cast_possible_truncation,
52 clippy::cast_sign_loss,
53 clippy::unused_async
54)]
55impl RustackS3 {
56 #[allow(clippy::too_many_lines)]
58 pub async fn handle_create_multipart_upload(
59 &self,
60 input: CreateMultipartUploadInput,
61 ) -> Result<CreateMultipartUploadOutput, S3Error> {
62 let bucket_name = input.bucket;
63 let key = input.key;
64
65 validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
66
67 let bucket = self
68 .state
69 .get_bucket(&bucket_name)
70 .map_err(S3ServiceError::into_s3_error)?;
71
72 let upload_id = generate_upload_id();
73
74 let metadata = ObjectMetadata {
76 content_type: input.content_type.clone(),
77 content_encoding: input.content_encoding.clone(),
78 content_disposition: input.content_disposition.clone(),
79 content_language: input.content_language.clone(),
80 cache_control: input.cache_control.clone(),
81 expires: None,
82 user_metadata: input.metadata.clone(),
83 sse_algorithm: input
84 .server_side_encryption
85 .as_ref()
86 .map(|s| s.as_str().to_owned()),
87 sse_kms_key_id: input.ssekms_key_id.clone(),
88 sse_bucket_key_enabled: input.bucket_key_enabled,
89 sse_customer_algorithm: input.sse_customer_algorithm.clone(),
90 sse_customer_key_md5: input.sse_customer_key_md5.clone(),
91 tagging: input
92 .tagging
93 .as_ref()
94 .map(|t| super::object::parse_tagging_header(t))
95 .unwrap_or_default(),
96 acl: input
97 .acl
98 .as_ref()
99 .and_then(|a| a.as_str().parse().ok())
100 .unwrap_or_default(),
101 object_lock_mode: input
102 .object_lock_mode
103 .as_ref()
104 .map(|m| m.as_str().to_owned()),
105 object_lock_retain_until: input.object_lock_retain_until_date,
106 object_lock_legal_hold: input
107 .object_lock_legal_hold_status
108 .as_ref()
109 .map(|s| s.as_str() == "ON"),
110 };
111
112 let mut upload = MultipartUpload::new(
113 upload_id.clone(),
114 key.clone(),
115 InternalOwner::default(),
116 metadata,
117 );
118
119 upload.storage_class = input
120 .storage_class
121 .as_ref()
122 .map_or_else(|| "STANDARD".to_owned(), |s| s.as_str().to_owned());
123
124 upload.checksum_algorithm = input
125 .checksum_algorithm
126 .as_ref()
127 .map(|a| a.as_str().to_owned());
128
129 if let Some(ref algo_str) = upload.checksum_algorithm {
132 let requested_type = input
133 .checksum_type
134 .as_ref()
135 .map(|ct| ct.as_str().to_owned());
136 upload.checksum_type = Some(match algo_str.as_str() {
137 "CRC64NVME" => "FULL_OBJECT".to_owned(),
138 "SHA1" | "SHA256" => "COMPOSITE".to_owned(),
139 _ => requested_type.unwrap_or_else(|| "COMPOSITE".to_owned()),
140 });
141 }
142
143 upload.sse_algorithm = input
144 .server_side_encryption
145 .as_ref()
146 .map(|s| s.as_str().to_owned());
147
148 upload.sse_kms_key_id.clone_from(&input.ssekms_key_id);
149
150 let output_checksum_type = upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
151 "FULL_OBJECT" => ChecksumType::FullObject,
152 _ => ChecksumType::Composite,
153 });
154
155 bucket.multipart_uploads.insert(upload_id.clone(), upload);
156
157 debug!(
158 bucket = %bucket_name,
159 key = %key,
160 upload_id = %upload_id,
161 "create_multipart_upload completed"
162 );
163
164 Ok(CreateMultipartUploadOutput {
165 abort_date: None,
166 abort_rule_id: None,
167 bucket: Some(bucket_name),
168 bucket_key_enabled: None,
169 checksum_algorithm: input.checksum_algorithm,
170 checksum_type: output_checksum_type,
171 key: Some(key),
172 request_charged: None,
173 sse_customer_algorithm: None,
174 sse_customer_key_md5: None,
175 ssekms_encryption_context: None,
176 ssekms_key_id: None,
177 server_side_encryption: None,
178 upload_id: Some(upload_id),
179 })
180 }
181
182 #[allow(clippy::too_many_lines)]
184 pub async fn handle_upload_part(
185 &self,
186 mut input: UploadPartInput,
187 ) -> Result<UploadPartOutput, S3Error> {
188 let part_checksum = extract_checksum_from_part(&input)?;
190
191 let bucket_name = input.bucket;
192 let key = input.key;
193 let upload_id = input.upload_id;
194 let part_number = input.part_number;
195
196 if !(1..=10_000).contains(&part_number) {
197 return Err(S3Error::invalid_argument(
198 "Part number must be between 1 and 10000",
199 ));
200 }
201
202 let bucket = self
203 .state
204 .get_bucket(&bucket_name)
205 .map_err(S3ServiceError::into_s3_error)?;
206
207 let upload_checksum_algorithm = {
209 let upload_ref = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
210 S3ServiceError::NoSuchUpload {
211 upload_id: upload_id.clone(),
212 }
213 .into_s3_error()
214 })?;
215 upload_ref.checksum_algorithm.clone()
216 };
217
218 let body_data = input.body.take().map(|b| b.data).unwrap_or_default();
220
221 validate_content_md5(input.content_md5.as_deref(), &body_data)
223 .map_err(S3ServiceError::into_s3_error)?;
224
225 let checksum = if let Some(ref algo_str) = upload_checksum_algorithm {
228 if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
229 let computed = compute_checksum(algo, &body_data);
230 if let Some(ref client_cksum) = part_checksum {
231 if !client_cksum.algorithm.eq_ignore_ascii_case(algo_str) {
233 return Err(S3ServiceError::InvalidArgument {
234 message: format!(
235 "Checksum algorithm mismatch: expected {algo_str}, got {}",
236 client_cksum.algorithm
237 ),
238 }
239 .into_s3_error());
240 }
241 if client_cksum.value != computed {
243 return Err(S3ServiceError::BadDigest.into_s3_error());
244 }
245 }
246 Some(ChecksumData {
247 algorithm: algo_str.clone(),
248 value: computed,
249 checksum_type: "FULL_OBJECT".to_owned(),
250 })
251 } else {
252 part_checksum
253 }
254 } else {
255 part_checksum
256 };
257
258 let write_result = self
260 .storage
261 .write_part(&bucket_name, &upload_id, part_number as u32, body_data)
262 .await
263 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
264
265 let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
267 checksum_to_output_fields(checksum.as_ref());
268
269 let part = UploadPart {
271 part_number: part_number as u32,
272 etag: write_result.etag.clone(),
273 size: write_result.size,
274 last_modified: Utc::now(),
275 checksum,
276 };
277
278 if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
279 upload.put_part(part);
280 }
281
282 debug!(
283 bucket = %bucket_name,
284 key = %key,
285 upload_id = %upload_id,
286 part_number,
287 "upload_part completed"
288 );
289
290 Ok(UploadPartOutput {
291 bucket_key_enabled: None,
292 checksum_crc32: out_crc32,
293 checksum_crc32c: out_crc32c,
294 checksum_crc64nvme: out_crc64nvme,
295 checksum_sha1: out_sha1,
296 checksum_sha256: out_sha256,
297 e_tag: Some(write_result.etag),
298 request_charged: None,
299 sse_customer_algorithm: None,
300 sse_customer_key_md5: None,
301 ssekms_key_id: None,
302 server_side_encryption: None,
303 })
304 }
305
306 pub async fn handle_upload_part_copy(
308 &self,
309 input: UploadPartCopyInput,
310 ) -> Result<UploadPartCopyOutput, S3Error> {
311 let bucket_name = input.bucket;
312 let upload_id = input.upload_id;
313 let part_number = input.part_number;
314
315 let (src_bucket, src_key, src_version_id) =
316 parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
317
318 let src_vid = src_version_id.as_deref().unwrap_or("null");
320 let data = self
321 .storage
322 .read_object(&src_bucket, &src_key, src_vid, None)
323 .await
324 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
325
326 let write_result = self
328 .storage
329 .write_part(&bucket_name, &upload_id, part_number as u32, data)
330 .await
331 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
332
333 let bucket = self
335 .state
336 .get_bucket(&bucket_name)
337 .map_err(S3ServiceError::into_s3_error)?;
338
339 let part = UploadPart {
340 part_number: part_number as u32,
341 etag: write_result.etag.clone(),
342 size: write_result.size,
343 last_modified: Utc::now(),
344 checksum: None,
345 };
346
347 if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
348 upload.put_part(part);
349 }
350
351 let copy_result = CopyPartResult {
352 checksum_crc32: None,
353 checksum_crc32c: None,
354 checksum_crc64nvme: None,
355 checksum_sha1: None,
356 checksum_sha256: None,
357 e_tag: Some(write_result.etag),
358 last_modified: Some(Utc::now()),
359 };
360
361 Ok(UploadPartCopyOutput {
362 bucket_key_enabled: None,
363 copy_part_result: Some(copy_result),
364 copy_source_version_id: src_version_id,
365 request_charged: None,
366 sse_customer_algorithm: None,
367 sse_customer_key_md5: None,
368 ssekms_key_id: None,
369 server_side_encryption: None,
370 })
371 }
372
373 #[allow(clippy::too_many_lines)]
375 pub async fn handle_complete_multipart_upload(
376 &self,
377 input: CompleteMultipartUploadInput,
378 ) -> Result<CompleteMultipartUploadOutput, S3Error> {
379 let bucket_name = input.bucket;
380 let key = input.key;
381 let upload_id = input.upload_id;
382
383 let bucket = self
384 .state
385 .get_bucket(&bucket_name)
386 .map_err(S3ServiceError::into_s3_error)?;
387
388 let upload = bucket
390 .multipart_uploads
391 .get(&upload_id)
392 .ok_or_else(|| {
393 S3ServiceError::NoSuchUpload {
394 upload_id: upload_id.clone(),
395 }
396 .into_s3_error()
397 })?
398 .clone();
399
400 let requested_parts = input
402 .multipart_upload
403 .map(|mu| mu.parts)
404 .unwrap_or_default();
405
406 let mut part_numbers: Vec<u32> = Vec::with_capacity(requested_parts.len());
408 let mut last_num = 0i32;
409
410 for cp in &requested_parts {
411 let part_num = cp.part_number.ok_or_else(|| {
412 S3Error::with_message(S3ErrorCode::InvalidArgument, "Part number is required")
413 })?;
414
415 if part_num <= last_num {
416 return Err(S3ServiceError::InvalidPartOrder.into_s3_error());
417 }
418 last_num = part_num;
419
420 let part_num_u32 = u32::try_from(part_num).map_err(|_| {
421 S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid part number")
422 })?;
423
424 upload
426 .get_part(part_num_u32)
427 .ok_or_else(|| S3ServiceError::InvalidPart.into_s3_error())?;
428
429 part_numbers.push(part_num_u32);
430 }
431
432 if part_numbers.len() > 1 {
435 for &num in &part_numbers[..part_numbers.len() - 1] {
436 if let Some(part) = upload.get_part(num) {
437 if part.size < MIN_PART_SIZE {
438 return Err(S3Error::with_message(
439 S3ErrorCode::EntityTooSmall,
440 format!(
441 "Your proposed upload is smaller than the minimum allowed size. \
442 Part {num} has size {} bytes, minimum is {MIN_PART_SIZE}",
443 part.size
444 ),
445 ));
446 }
447 }
448 }
449 }
450
451 let version_id = if bucket.is_versioning_enabled() {
453 crate::utils::generate_version_id()
454 } else {
455 "null".to_owned()
456 };
457
458 let (write_result, _part_md5s) = self
460 .storage
461 .complete_multipart(&bucket_name, &upload_id, &key, &version_id, &part_numbers)
462 .await
463 .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
464
465 let final_checksum = if let Some(ref algo_str) = upload.checksum_algorithm {
468 if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
469 let checksum_type_str = upload.checksum_type.as_deref().unwrap_or("COMPOSITE");
470
471 let part_checksums: Vec<String> = part_numbers
473 .iter()
474 .filter_map(|&num| {
475 upload
476 .get_part(num)
477 .and_then(|p| p.checksum.as_ref())
478 .map(|c| c.value.clone())
479 })
480 .collect();
481
482 let value = compute_composite_checksum(algo, &part_checksums);
487
488 Some(ChecksumData {
489 algorithm: algo_str.clone(),
490 value,
491 checksum_type: checksum_type_str.to_owned(),
492 })
493 } else {
494 None
495 }
496 } else {
497 None
498 };
499
500 let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
501 checksum_to_output_fields(final_checksum.as_ref());
502
503 let out_checksum_type = final_checksum
504 .as_ref()
505 .map(|c| match c.checksum_type.as_str() {
506 "FULL_OBJECT" => ChecksumType::FullObject,
507 _ => ChecksumType::Composite,
508 });
509
510 let obj = S3Object {
512 key: key.clone(),
513 version_id: version_id.clone(),
514 etag: write_result.etag.clone(),
515 size: write_result.size,
516 last_modified: Utc::now(),
517 storage_class: upload.storage_class.clone(),
518 metadata: upload.metadata.clone(),
519 owner: upload.owner.clone(),
520 checksum: final_checksum,
521 parts_count: Some(part_numbers.len() as u32),
522 part_etags: requested_parts
523 .iter()
524 .filter_map(|p| p.e_tag.clone())
525 .collect(),
526 };
527
528 {
529 let mut store = bucket.objects.write();
530 store.put(obj);
531 }
532
533 bucket.multipart_uploads.remove(&upload_id);
535
536 debug!(
537 bucket = %bucket_name,
538 key = %key,
539 upload_id = %upload_id,
540 parts = part_numbers.len(),
541 "complete_multipart_upload completed"
542 );
543
544 let real_version_id = if version_id == "null" {
545 None
546 } else {
547 Some(version_id)
548 };
549
550 Ok(CompleteMultipartUploadOutput {
551 bucket: Some(bucket_name.clone()),
552 bucket_key_enabled: None,
553 checksum_crc32: out_crc32,
554 checksum_crc32c: out_crc32c,
555 checksum_crc64nvme: out_crc64nvme,
556 checksum_sha1: out_sha1,
557 checksum_sha256: out_sha256,
558 checksum_type: out_checksum_type,
559 e_tag: Some(write_result.etag),
560 expiration: None,
561 key: Some(key),
562 location: Some(format!("http://s3.amazonaws.com/{bucket_name}")),
563 request_charged: None,
564 ssekms_key_id: None,
565 server_side_encryption: None,
566 version_id: real_version_id,
567 })
568 }
569
570 pub async fn handle_abort_multipart_upload(
572 &self,
573 input: AbortMultipartUploadInput,
574 ) -> Result<AbortMultipartUploadOutput, S3Error> {
575 let bucket_name = input.bucket;
576 let upload_id = input.upload_id;
577
578 let bucket = self
579 .state
580 .get_bucket(&bucket_name)
581 .map_err(S3ServiceError::into_s3_error)?;
582
583 bucket.multipart_uploads.remove(&upload_id);
585
586 self.storage.abort_multipart(&bucket_name, &upload_id);
588
589 debug!(
590 bucket = %bucket_name,
591 upload_id = %upload_id,
592 "abort_multipart_upload completed"
593 );
594
595 Ok(AbortMultipartUploadOutput {
596 request_charged: None,
597 })
598 }
599
600 pub async fn handle_list_parts(
602 &self,
603 input: ListPartsInput,
604 ) -> Result<ListPartsOutput, S3Error> {
605 let bucket_name = input.bucket;
606 let key = input.key;
607 let upload_id = input.upload_id;
608
609 let bucket = self
610 .state
611 .get_bucket(&bucket_name)
612 .map_err(S3ServiceError::into_s3_error)?;
613
614 let upload = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
615 S3ServiceError::NoSuchUpload {
616 upload_id: upload_id.clone(),
617 }
618 .into_s3_error()
619 })?;
620
621 let max_parts = input.max_parts.unwrap_or(1000) as usize;
622 let part_number_marker: u32 = input
623 .part_number_marker
624 .as_deref()
625 .and_then(|s| s.parse().ok())
626 .unwrap_or(0);
627
628 let all_parts: Vec<&UploadPart> = upload
629 .parts
630 .values()
631 .filter(|p| p.part_number > part_number_marker)
632 .collect();
633
634 let is_truncated = all_parts.len() > max_parts;
635 let parts_to_return = &all_parts[..all_parts.len().min(max_parts)];
636
637 let s3_parts: Vec<Part> = parts_to_return
638 .iter()
639 .map(|p| {
640 let (crc32, crc32c, crc64nvme, sha1, sha256) =
641 checksum_to_output_fields(p.checksum.as_ref());
642 Part {
643 checksum_crc32: crc32,
644 checksum_crc32c: crc32c,
645 checksum_crc64nvme: crc64nvme,
646 checksum_sha1: sha1,
647 checksum_sha256: sha256,
648 e_tag: Some(p.etag.clone()),
649 last_modified: Some(p.last_modified),
650 part_number: Some(p.part_number as i32),
651 size: Some(p.size as i64),
652 }
653 })
654 .collect();
655
656 let next_marker = if is_truncated {
657 s3_parts.last().and_then(|p| p.part_number)
658 } else {
659 None
660 };
661
662 let owner = to_model_owner(&upload.owner);
663
664 Ok(ListPartsOutput {
665 abort_date: None,
666 abort_rule_id: None,
667 bucket: Some(bucket_name),
668 checksum_algorithm: upload
669 .checksum_algorithm
670 .as_ref()
671 .map(|a| ChecksumAlgorithm::from(a.as_str())),
672 checksum_type: upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
673 "FULL_OBJECT" => ChecksumType::FullObject,
674 _ => ChecksumType::Composite,
675 }),
676 initiator: Some(Initiator {
677 display_name: Some(upload.owner.display_name.clone()),
678 id: Some(upload.owner.id.clone()),
679 }),
680 is_truncated: Some(is_truncated),
681 key: Some(key),
682 max_parts: Some(max_parts as i32),
683 next_part_number_marker: next_marker.map(|n| n.to_string()),
684 owner: Some(owner),
685 part_number_marker: Some(part_number_marker.to_string()),
686 parts: s3_parts,
687 request_charged: None,
688 storage_class: Some(StorageClass::from(upload.storage_class.as_str())),
689 upload_id: Some(upload_id),
690 })
691 }
692
693 pub async fn handle_list_multipart_uploads(
695 &self,
696 input: ListMultipartUploadsInput,
697 ) -> Result<ListMultipartUploadsOutput, S3Error> {
698 let bucket_name = input.bucket;
699
700 let bucket = self
701 .state
702 .get_bucket(&bucket_name)
703 .map_err(S3ServiceError::into_s3_error)?;
704
705 let prefix = input.prefix.unwrap_or_default();
706 let max_uploads = input.max_uploads.unwrap_or(1000) as usize;
707
708 let mut uploads: Vec<MultipartUpload> = bucket
709 .multipart_uploads
710 .iter()
711 .filter(|entry| entry.key.starts_with(&prefix))
712 .map(|entry| entry.value().clone())
713 .collect();
714
715 uploads.sort_by(|a, b| a.key.cmp(&b.key).then(a.initiated.cmp(&b.initiated)));
717
718 let is_truncated = uploads.len() > max_uploads;
719 let uploads_to_return = &uploads[..uploads.len().min(max_uploads)];
720
721 let s3_uploads: Vec<ModelMultipartUpload> = uploads_to_return
722 .iter()
723 .map(|u| ModelMultipartUpload {
724 checksum_algorithm: u
725 .checksum_algorithm
726 .as_ref()
727 .map(|a| ChecksumAlgorithm::from(a.as_str())),
728 checksum_type: None,
729 initiated: Some(u.initiated),
730 initiator: Some(Initiator {
731 display_name: Some(u.owner.display_name.clone()),
732 id: Some(u.owner.id.clone()),
733 }),
734 key: Some(u.key.clone()),
735 owner: Some(to_model_owner(&u.owner)),
736 storage_class: Some(StorageClass::from(u.storage_class.as_str())),
737 upload_id: Some(u.upload_id.clone()),
738 })
739 .collect();
740
741 let next_key_marker = if is_truncated {
742 s3_uploads.last().and_then(|u| u.key.clone())
743 } else {
744 None
745 };
746 let next_upload_id_marker = if is_truncated {
747 s3_uploads.last().and_then(|u| u.upload_id.clone())
748 } else {
749 None
750 };
751
752 Ok(ListMultipartUploadsOutput {
753 bucket: Some(bucket_name),
754 common_prefixes: Vec::new(),
755 delimiter: input.delimiter,
756 encoding_type: None,
757 is_truncated: Some(is_truncated),
758 key_marker: input.key_marker,
759 max_uploads: Some(max_uploads as i32),
760 next_key_marker,
761 next_upload_id_marker,
762 prefix: Some(prefix),
763 request_charged: None,
764 upload_id_marker: input.upload_id_marker,
765 uploads: s3_uploads,
766 })
767 }
768}
769
770type ChecksumOutputFields = (
772 Option<String>,
773 Option<String>,
774 Option<String>,
775 Option<String>,
776 Option<String>,
777);
778
779#[allow(clippy::result_large_err)]
784fn extract_checksum_from_part(input: &UploadPartInput) -> Result<Option<ChecksumData>, S3Error> {
785 let candidates: [(&str, &Option<String>); 5] = [
786 ("CRC32", &input.checksum_crc32),
787 ("CRC32C", &input.checksum_crc32c),
788 ("CRC64NVME", &input.checksum_crc64nvme),
789 ("SHA1", &input.checksum_sha1),
790 ("SHA256", &input.checksum_sha256),
791 ];
792 let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
793 if found.len() > 1 {
794 return Err(S3ServiceError::InvalidArgument {
795 message: "Only one checksum value can be provided per request".to_owned(),
796 }
797 .into_s3_error());
798 }
799 Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
800 algorithm: (*alg).to_owned(),
801 value: val.as_ref().unwrap_or(&String::new()).clone(),
802 checksum_type: "FULL_OBJECT".to_owned(),
803 }))
804}
805
806fn checksum_to_output_fields(checksum: Option<&ChecksumData>) -> ChecksumOutputFields {
809 let Some(c) = checksum else {
810 return (None, None, None, None, None);
811 };
812 match c.algorithm.as_str() {
813 "CRC32" => (Some(c.value.clone()), None, None, None, None),
814 "CRC32C" => (None, Some(c.value.clone()), None, None, None),
815 "CRC64NVME" => (None, None, Some(c.value.clone()), None, None),
816 "SHA1" => (None, None, None, Some(c.value.clone()), None),
817 "SHA256" => (None, None, None, None, Some(c.value.clone())),
818 _ => (None, None, None, None, None),
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825
826 #[test]
827 fn test_should_parse_copy_source_basic() {
828 let (bucket, key, version) = parse_copy_source("/my-bucket/my-key").unwrap();
829 assert_eq!(bucket, "my-bucket");
830 assert_eq!(key, "my-key");
831 assert!(version.is_none());
832 }
833
834 #[test]
835 fn test_should_parse_copy_source_without_leading_slash() {
836 let (bucket, key, version) = parse_copy_source("my-bucket/my-key").unwrap();
837 assert_eq!(bucket, "my-bucket");
838 assert_eq!(key, "my-key");
839 assert!(version.is_none());
840 }
841
842 #[test]
843 fn test_should_parse_copy_source_with_version_id() {
844 let (bucket, key, version) =
845 parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
846 assert_eq!(bucket, "my-bucket");
847 assert_eq!(key, "my-key");
848 assert_eq!(version.as_deref(), Some("abc123"));
849 }
850
851 #[test]
852 fn test_should_parse_copy_source_with_nested_key() {
853 let (bucket, key, version) =
854 parse_copy_source("/my-bucket/path/to/my-key?versionId=v1").unwrap();
855 assert_eq!(bucket, "my-bucket");
856 assert_eq!(key, "path/to/my-key");
857 assert_eq!(version.as_deref(), Some("v1"));
858 }
859
860 #[test]
861 fn test_should_fail_on_invalid_copy_source() {
862 let result = parse_copy_source("no-slash");
863 assert!(result.is_err());
864 }
865
866 #[test]
867 fn test_should_fail_on_empty_bucket() {
868 let result = parse_copy_source("/");
869 assert!(result.is_err());
870 }
871}