Skip to main content

rustack_s3_core/ops/
multipart.rs

1//! Multipart upload operation handlers.
2//!
3//! Implements `create_multipart_upload`, `upload_part`, `upload_part_copy`,
4//! `complete_multipart_upload`, `abort_multipart_upload`, `list_parts`,
5//! and `list_multipart_uploads`.
6
7use std::str::FromStr;
8
9use chrono::Utc;
10use rustack_s3_model::{
11    error::{S3Error, S3ErrorCode},
12    input::{
13        AbortMultipartUploadInput, CompleteMultipartUploadInput, CreateMultipartUploadInput,
14        ListMultipartUploadsInput, ListPartsInput, UploadPartCopyInput, UploadPartInput,
15    },
16    output::{
17        AbortMultipartUploadOutput, CompleteMultipartUploadOutput, CreateMultipartUploadOutput,
18        ListMultipartUploadsOutput, ListPartsOutput, UploadPartCopyOutput, UploadPartOutput,
19    },
20    types::{
21        ChecksumAlgorithm, ChecksumType, CopyPartResult, Initiator,
22        MultipartUpload as ModelMultipartUpload, Part, StorageClass,
23    },
24};
25use tracing::debug;
26
27use crate::{
28    checksums::{
29        ChecksumAlgorithm as CoreChecksumAlgorithm, compute_checksum, compute_composite_checksum,
30    },
31    error::S3ServiceError,
32    provider::RustackS3,
33    state::{
34        multipart::{MultipartUpload, UploadPart},
35        object::{ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
36    },
37    utils::{generate_upload_id, parse_copy_source},
38    validation::{validate_content_md5, validate_object_key},
39};
40
41/// Minimum part size for multipart uploads (5 MB). All parts except the last
42/// must be at least this size per the S3 specification.
43const MIN_PART_SIZE: u64 = 5 * 1024 * 1024;
44
45use super::bucket::to_model_owner;
46
47// AWS S3 DTOs use signed integers (i32/i64) for inherently non-negative values.
48// These handler methods must remain async for consistency.
49#[allow(
50    clippy::cast_possible_wrap,
51    clippy::cast_possible_truncation,
52    clippy::cast_sign_loss,
53    clippy::unused_async
54)]
55impl RustackS3 {
56    /// Create a new multipart upload.
57    #[allow(clippy::too_many_lines)]
58    pub async fn handle_create_multipart_upload(
59        &self,
60        input: CreateMultipartUploadInput,
61    ) -> Result<CreateMultipartUploadOutput, S3Error> {
62        let bucket_name = input.bucket;
63        let key = input.key;
64
65        validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
66
67        let bucket = self
68            .state
69            .get_bucket(&bucket_name)
70            .map_err(S3ServiceError::into_s3_error)?;
71
72        let upload_id = generate_upload_id();
73
74        // Build metadata from the request.
75        let metadata = ObjectMetadata {
76            content_type: input.content_type.clone(),
77            content_encoding: input.content_encoding.clone(),
78            content_disposition: input.content_disposition.clone(),
79            content_language: input.content_language.clone(),
80            cache_control: input.cache_control.clone(),
81            expires: None,
82            user_metadata: input.metadata.clone(),
83            sse_algorithm: input
84                .server_side_encryption
85                .as_ref()
86                .map(|s| s.as_str().to_owned()),
87            sse_kms_key_id: input.ssekms_key_id.clone(),
88            sse_bucket_key_enabled: input.bucket_key_enabled,
89            sse_customer_algorithm: input.sse_customer_algorithm.clone(),
90            sse_customer_key_md5: input.sse_customer_key_md5.clone(),
91            tagging: input
92                .tagging
93                .as_ref()
94                .map(|t| super::object::parse_tagging_header(t))
95                .unwrap_or_default(),
96            acl: input
97                .acl
98                .as_ref()
99                .and_then(|a| a.as_str().parse().ok())
100                .unwrap_or_default(),
101            object_lock_mode: input
102                .object_lock_mode
103                .as_ref()
104                .map(|m| m.as_str().to_owned()),
105            object_lock_retain_until: input.object_lock_retain_until_date,
106            object_lock_legal_hold: input
107                .object_lock_legal_hold_status
108                .as_ref()
109                .map(|s| s.as_str() == "ON"),
110        };
111
112        let mut upload = MultipartUpload::new(
113            upload_id.clone(),
114            key.clone(),
115            InternalOwner::default(),
116            metadata,
117        );
118
119        upload.storage_class = input
120            .storage_class
121            .as_ref()
122            .map_or_else(|| "STANDARD".to_owned(), |s| s.as_str().to_owned());
123
124        upload.checksum_algorithm = input
125            .checksum_algorithm
126            .as_ref()
127            .map(|a| a.as_str().to_owned());
128
129        // Determine checksum type: CRC64NVME forces FULL_OBJECT, SHA-1/SHA-256
130        // force COMPOSITE, CRC32/CRC32C default to COMPOSITE but allow override.
131        if let Some(ref algo_str) = upload.checksum_algorithm {
132            let requested_type = input
133                .checksum_type
134                .as_ref()
135                .map(|ct| ct.as_str().to_owned());
136            upload.checksum_type = Some(match algo_str.as_str() {
137                "CRC64NVME" => "FULL_OBJECT".to_owned(),
138                "SHA1" | "SHA256" => "COMPOSITE".to_owned(),
139                _ => requested_type.unwrap_or_else(|| "COMPOSITE".to_owned()),
140            });
141        }
142
143        upload.sse_algorithm = input
144            .server_side_encryption
145            .as_ref()
146            .map(|s| s.as_str().to_owned());
147
148        upload.sse_kms_key_id.clone_from(&input.ssekms_key_id);
149
150        let output_checksum_type = upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
151            "FULL_OBJECT" => ChecksumType::FullObject,
152            _ => ChecksumType::Composite,
153        });
154
155        bucket.multipart_uploads.insert(upload_id.clone(), upload);
156
157        debug!(
158            bucket = %bucket_name,
159            key = %key,
160            upload_id = %upload_id,
161            "create_multipart_upload completed"
162        );
163
164        Ok(CreateMultipartUploadOutput {
165            abort_date: None,
166            abort_rule_id: None,
167            bucket: Some(bucket_name),
168            bucket_key_enabled: None,
169            checksum_algorithm: input.checksum_algorithm,
170            checksum_type: output_checksum_type,
171            key: Some(key),
172            request_charged: None,
173            sse_customer_algorithm: None,
174            sse_customer_key_md5: None,
175            ssekms_encryption_context: None,
176            ssekms_key_id: None,
177            server_side_encryption: None,
178            upload_id: Some(upload_id),
179        })
180    }
181
182    /// Upload a single part of a multipart upload.
183    #[allow(clippy::too_many_lines)]
184    pub async fn handle_upload_part(
185        &self,
186        mut input: UploadPartInput,
187    ) -> Result<UploadPartOutput, S3Error> {
188        // Extract checksum before moving fields out of input.
189        let part_checksum = extract_checksum_from_part(&input)?;
190
191        let bucket_name = input.bucket;
192        let key = input.key;
193        let upload_id = input.upload_id;
194        let part_number = input.part_number;
195
196        if !(1..=10_000).contains(&part_number) {
197            return Err(S3Error::invalid_argument(
198                "Part number must be between 1 and 10000",
199            ));
200        }
201
202        let bucket = self
203            .state
204            .get_bucket(&bucket_name)
205            .map_err(S3ServiceError::into_s3_error)?;
206
207        // Verify the upload exists and get its checksum algorithm.
208        let upload_checksum_algorithm = {
209            let upload_ref = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
210                S3ServiceError::NoSuchUpload {
211                    upload_id: upload_id.clone(),
212                }
213                .into_s3_error()
214            })?;
215            upload_ref.checksum_algorithm.clone()
216        };
217
218        // Collect body data.
219        let body_data = input.body.take().map(|b| b.data).unwrap_or_default();
220
221        // Validate Content-MD5 if provided.
222        validate_content_md5(input.content_md5.as_deref(), &body_data)
223            .map_err(S3ServiceError::into_s3_error)?;
224
225        // If the multipart upload has a checksum algorithm, validate the part
226        // checksum and compute server-side if not provided.
227        let checksum = if let Some(ref algo_str) = upload_checksum_algorithm {
228            if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
229                let computed = compute_checksum(algo, &body_data);
230                if let Some(ref client_cksum) = part_checksum {
231                    // Validate algorithm matches.
232                    if !client_cksum.algorithm.eq_ignore_ascii_case(algo_str) {
233                        return Err(S3ServiceError::InvalidArgument {
234                            message: format!(
235                                "Checksum algorithm mismatch: expected {algo_str}, got {}",
236                                client_cksum.algorithm
237                            ),
238                        }
239                        .into_s3_error());
240                    }
241                    // Validate value.
242                    if client_cksum.value != computed {
243                        return Err(S3ServiceError::BadDigest.into_s3_error());
244                    }
245                }
246                Some(ChecksumData {
247                    algorithm: algo_str.clone(),
248                    value: computed,
249                    checksum_type: "FULL_OBJECT".to_owned(),
250                })
251            } else {
252                part_checksum
253            }
254        } else {
255            part_checksum
256        };
257
258        // Write part to storage.
259        let write_result = self
260            .storage
261            .write_part(&bucket_name, &upload_id, part_number as u32, body_data)
262            .await
263            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
264
265        // Build checksum output fields.
266        let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
267            checksum_to_output_fields(checksum.as_ref());
268
269        // Record the part metadata.
270        let part = UploadPart {
271            part_number: part_number as u32,
272            etag: write_result.etag.clone(),
273            size: write_result.size,
274            last_modified: Utc::now(),
275            checksum,
276        };
277
278        if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
279            upload.put_part(part);
280        }
281
282        debug!(
283            bucket = %bucket_name,
284            key = %key,
285            upload_id = %upload_id,
286            part_number,
287            "upload_part completed"
288        );
289
290        Ok(UploadPartOutput {
291            bucket_key_enabled: None,
292            checksum_crc32: out_crc32,
293            checksum_crc32c: out_crc32c,
294            checksum_crc64nvme: out_crc64nvme,
295            checksum_sha1: out_sha1,
296            checksum_sha256: out_sha256,
297            e_tag: Some(write_result.etag),
298            request_charged: None,
299            sse_customer_algorithm: None,
300            sse_customer_key_md5: None,
301            ssekms_key_id: None,
302            server_side_encryption: None,
303        })
304    }
305
306    /// Upload a part by copying from an existing object.
307    pub async fn handle_upload_part_copy(
308        &self,
309        input: UploadPartCopyInput,
310    ) -> Result<UploadPartCopyOutput, S3Error> {
311        let bucket_name = input.bucket;
312        let upload_id = input.upload_id;
313        let part_number = input.part_number;
314
315        let (src_bucket, src_key, src_version_id) =
316            parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
317
318        // Read source object data.
319        let src_vid = src_version_id.as_deref().unwrap_or("null");
320        let data = self
321            .storage
322            .read_object(&src_bucket, &src_key, src_vid, None)
323            .await
324            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
325
326        // Write as part.
327        let write_result = self
328            .storage
329            .write_part(&bucket_name, &upload_id, part_number as u32, data)
330            .await
331            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
332
333        // Record the part metadata.
334        let bucket = self
335            .state
336            .get_bucket(&bucket_name)
337            .map_err(S3ServiceError::into_s3_error)?;
338
339        let part = UploadPart {
340            part_number: part_number as u32,
341            etag: write_result.etag.clone(),
342            size: write_result.size,
343            last_modified: Utc::now(),
344            checksum: None,
345        };
346
347        if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
348            upload.put_part(part);
349        }
350
351        let copy_result = CopyPartResult {
352            checksum_crc32: None,
353            checksum_crc32c: None,
354            checksum_crc64nvme: None,
355            checksum_sha1: None,
356            checksum_sha256: None,
357            e_tag: Some(write_result.etag),
358            last_modified: Some(Utc::now()),
359        };
360
361        Ok(UploadPartCopyOutput {
362            bucket_key_enabled: None,
363            copy_part_result: Some(copy_result),
364            copy_source_version_id: src_version_id,
365            request_charged: None,
366            sse_customer_algorithm: None,
367            sse_customer_key_md5: None,
368            ssekms_key_id: None,
369            server_side_encryption: None,
370        })
371    }
372
373    /// Complete a multipart upload by assembling parts into the final object.
374    #[allow(clippy::too_many_lines)]
375    pub async fn handle_complete_multipart_upload(
376        &self,
377        input: CompleteMultipartUploadInput,
378    ) -> Result<CompleteMultipartUploadOutput, S3Error> {
379        let bucket_name = input.bucket;
380        let key = input.key;
381        let upload_id = input.upload_id;
382
383        let bucket = self
384            .state
385            .get_bucket(&bucket_name)
386            .map_err(S3ServiceError::into_s3_error)?;
387
388        // Get the upload.
389        let upload = bucket
390            .multipart_uploads
391            .get(&upload_id)
392            .ok_or_else(|| {
393                S3ServiceError::NoSuchUpload {
394                    upload_id: upload_id.clone(),
395                }
396                .into_s3_error()
397            })?
398            .clone();
399
400        // Extract the requested part list.
401        let requested_parts = input
402            .multipart_upload
403            .map(|mu| mu.parts)
404            .unwrap_or_default();
405
406        // Validate parts are in order and exist.
407        let mut part_numbers: Vec<u32> = Vec::with_capacity(requested_parts.len());
408        let mut last_num = 0i32;
409
410        for cp in &requested_parts {
411            let part_num = cp.part_number.ok_or_else(|| {
412                S3Error::with_message(S3ErrorCode::InvalidArgument, "Part number is required")
413            })?;
414
415            if part_num <= last_num {
416                return Err(S3ServiceError::InvalidPartOrder.into_s3_error());
417            }
418            last_num = part_num;
419
420            let part_num_u32 = u32::try_from(part_num).map_err(|_| {
421                S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid part number")
422            })?;
423
424            // Verify the part exists in our upload record.
425            upload
426                .get_part(part_num_u32)
427                .ok_or_else(|| S3ServiceError::InvalidPart.into_s3_error())?;
428
429            part_numbers.push(part_num_u32);
430        }
431
432        // Validate minimum part size for all parts except the last one.
433        // S3 requires each part (except the final one) to be at least 5 MB.
434        if part_numbers.len() > 1 {
435            for &num in &part_numbers[..part_numbers.len() - 1] {
436                if let Some(part) = upload.get_part(num) {
437                    if part.size < MIN_PART_SIZE {
438                        return Err(S3Error::with_message(
439                            S3ErrorCode::EntityTooSmall,
440                            format!(
441                                "Your proposed upload is smaller than the minimum allowed size. \
442                                 Part {num} has size {} bytes, minimum is {MIN_PART_SIZE}",
443                                part.size
444                            ),
445                        ));
446                    }
447                }
448            }
449        }
450
451        // Determine version ID.
452        let version_id = if bucket.is_versioning_enabled() {
453            crate::utils::generate_version_id()
454        } else {
455            "null".to_owned()
456        };
457
458        // Assemble parts in storage.
459        let (write_result, _part_md5s) = self
460            .storage
461            .complete_multipart(&bucket_name, &upload_id, &key, &version_id, &part_numbers)
462            .await
463            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
464
465        // Compute the combined checksum for the final object if the multipart
466        // upload was created with a checksum algorithm.
467        let final_checksum = if let Some(ref algo_str) = upload.checksum_algorithm {
468            if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
469                let checksum_type_str = upload.checksum_type.as_deref().unwrap_or("COMPOSITE");
470
471                // Collect part checksums in order.
472                let part_checksums: Vec<String> = part_numbers
473                    .iter()
474                    .filter_map(|&num| {
475                        upload
476                            .get_part(num)
477                            .and_then(|p| p.checksum.as_ref())
478                            .map(|c| c.value.clone())
479                    })
480                    .collect();
481
482                // Compute the composite checksum (hash of concatenated part
483                // checksums with `-N` suffix). True FULL_OBJECT CRC combination
484                // using GF(2) math is not yet implemented; composite is used as
485                // a fallback for all checksum types.
486                let value = compute_composite_checksum(algo, &part_checksums);
487
488                Some(ChecksumData {
489                    algorithm: algo_str.clone(),
490                    value,
491                    checksum_type: checksum_type_str.to_owned(),
492                })
493            } else {
494                None
495            }
496        } else {
497            None
498        };
499
500        let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
501            checksum_to_output_fields(final_checksum.as_ref());
502
503        let out_checksum_type = final_checksum
504            .as_ref()
505            .map(|c| match c.checksum_type.as_str() {
506                "FULL_OBJECT" => ChecksumType::FullObject,
507                _ => ChecksumType::Composite,
508            });
509
510        // Build the final object.
511        let obj = S3Object {
512            key: key.clone(),
513            version_id: version_id.clone(),
514            etag: write_result.etag.clone(),
515            size: write_result.size,
516            last_modified: Utc::now(),
517            storage_class: upload.storage_class.clone(),
518            metadata: upload.metadata.clone(),
519            owner: upload.owner.clone(),
520            checksum: final_checksum,
521            parts_count: Some(part_numbers.len() as u32),
522            part_etags: requested_parts
523                .iter()
524                .filter_map(|p| p.e_tag.clone())
525                .collect(),
526        };
527
528        {
529            let mut store = bucket.objects.write();
530            store.put(obj);
531        }
532
533        // Remove the completed upload.
534        bucket.multipart_uploads.remove(&upload_id);
535
536        debug!(
537            bucket = %bucket_name,
538            key = %key,
539            upload_id = %upload_id,
540            parts = part_numbers.len(),
541            "complete_multipart_upload completed"
542        );
543
544        let real_version_id = if version_id == "null" {
545            None
546        } else {
547            Some(version_id)
548        };
549
550        Ok(CompleteMultipartUploadOutput {
551            bucket: Some(bucket_name.clone()),
552            bucket_key_enabled: None,
553            checksum_crc32: out_crc32,
554            checksum_crc32c: out_crc32c,
555            checksum_crc64nvme: out_crc64nvme,
556            checksum_sha1: out_sha1,
557            checksum_sha256: out_sha256,
558            checksum_type: out_checksum_type,
559            e_tag: Some(write_result.etag),
560            expiration: None,
561            key: Some(key),
562            location: Some(format!("http://s3.amazonaws.com/{bucket_name}")),
563            request_charged: None,
564            ssekms_key_id: None,
565            server_side_encryption: None,
566            version_id: real_version_id,
567        })
568    }
569
570    /// Abort a multipart upload.
571    pub async fn handle_abort_multipart_upload(
572        &self,
573        input: AbortMultipartUploadInput,
574    ) -> Result<AbortMultipartUploadOutput, S3Error> {
575        let bucket_name = input.bucket;
576        let upload_id = input.upload_id;
577
578        let bucket = self
579            .state
580            .get_bucket(&bucket_name)
581            .map_err(S3ServiceError::into_s3_error)?;
582
583        // Remove the upload metadata (idempotent: no error if already gone).
584        bucket.multipart_uploads.remove(&upload_id);
585
586        // Clean up storage parts.
587        self.storage.abort_multipart(&bucket_name, &upload_id);
588
589        debug!(
590            bucket = %bucket_name,
591            upload_id = %upload_id,
592            "abort_multipart_upload completed"
593        );
594
595        Ok(AbortMultipartUploadOutput {
596            request_charged: None,
597        })
598    }
599
600    /// List parts that have been uploaded for a multipart upload.
601    pub async fn handle_list_parts(
602        &self,
603        input: ListPartsInput,
604    ) -> Result<ListPartsOutput, S3Error> {
605        let bucket_name = input.bucket;
606        let key = input.key;
607        let upload_id = input.upload_id;
608
609        let bucket = self
610            .state
611            .get_bucket(&bucket_name)
612            .map_err(S3ServiceError::into_s3_error)?;
613
614        let upload = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
615            S3ServiceError::NoSuchUpload {
616                upload_id: upload_id.clone(),
617            }
618            .into_s3_error()
619        })?;
620
621        let max_parts = input.max_parts.unwrap_or(1000) as usize;
622        let part_number_marker: u32 = input
623            .part_number_marker
624            .as_deref()
625            .and_then(|s| s.parse().ok())
626            .unwrap_or(0);
627
628        let all_parts: Vec<&UploadPart> = upload
629            .parts
630            .values()
631            .filter(|p| p.part_number > part_number_marker)
632            .collect();
633
634        let is_truncated = all_parts.len() > max_parts;
635        let parts_to_return = &all_parts[..all_parts.len().min(max_parts)];
636
637        let s3_parts: Vec<Part> = parts_to_return
638            .iter()
639            .map(|p| {
640                let (crc32, crc32c, crc64nvme, sha1, sha256) =
641                    checksum_to_output_fields(p.checksum.as_ref());
642                Part {
643                    checksum_crc32: crc32,
644                    checksum_crc32c: crc32c,
645                    checksum_crc64nvme: crc64nvme,
646                    checksum_sha1: sha1,
647                    checksum_sha256: sha256,
648                    e_tag: Some(p.etag.clone()),
649                    last_modified: Some(p.last_modified),
650                    part_number: Some(p.part_number as i32),
651                    size: Some(p.size as i64),
652                }
653            })
654            .collect();
655
656        let next_marker = if is_truncated {
657            s3_parts.last().and_then(|p| p.part_number)
658        } else {
659            None
660        };
661
662        let owner = to_model_owner(&upload.owner);
663
664        Ok(ListPartsOutput {
665            abort_date: None,
666            abort_rule_id: None,
667            bucket: Some(bucket_name),
668            checksum_algorithm: upload
669                .checksum_algorithm
670                .as_ref()
671                .map(|a| ChecksumAlgorithm::from(a.as_str())),
672            checksum_type: upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
673                "FULL_OBJECT" => ChecksumType::FullObject,
674                _ => ChecksumType::Composite,
675            }),
676            initiator: Some(Initiator {
677                display_name: Some(upload.owner.display_name.clone()),
678                id: Some(upload.owner.id.clone()),
679            }),
680            is_truncated: Some(is_truncated),
681            key: Some(key),
682            max_parts: Some(max_parts as i32),
683            next_part_number_marker: next_marker.map(|n| n.to_string()),
684            owner: Some(owner),
685            part_number_marker: Some(part_number_marker.to_string()),
686            parts: s3_parts,
687            request_charged: None,
688            storage_class: Some(StorageClass::from(upload.storage_class.as_str())),
689            upload_id: Some(upload_id),
690        })
691    }
692
693    /// List in-progress multipart uploads for a bucket.
694    pub async fn handle_list_multipart_uploads(
695        &self,
696        input: ListMultipartUploadsInput,
697    ) -> Result<ListMultipartUploadsOutput, S3Error> {
698        let bucket_name = input.bucket;
699
700        let bucket = self
701            .state
702            .get_bucket(&bucket_name)
703            .map_err(S3ServiceError::into_s3_error)?;
704
705        let prefix = input.prefix.unwrap_or_default();
706        let max_uploads = input.max_uploads.unwrap_or(1000) as usize;
707
708        let mut uploads: Vec<MultipartUpload> = bucket
709            .multipart_uploads
710            .iter()
711            .filter(|entry| entry.key.starts_with(&prefix))
712            .map(|entry| entry.value().clone())
713            .collect();
714
715        // Sort by key then by initiated time.
716        uploads.sort_by(|a, b| a.key.cmp(&b.key).then(a.initiated.cmp(&b.initiated)));
717
718        let is_truncated = uploads.len() > max_uploads;
719        let uploads_to_return = &uploads[..uploads.len().min(max_uploads)];
720
721        let s3_uploads: Vec<ModelMultipartUpload> = uploads_to_return
722            .iter()
723            .map(|u| ModelMultipartUpload {
724                checksum_algorithm: u
725                    .checksum_algorithm
726                    .as_ref()
727                    .map(|a| ChecksumAlgorithm::from(a.as_str())),
728                checksum_type: None,
729                initiated: Some(u.initiated),
730                initiator: Some(Initiator {
731                    display_name: Some(u.owner.display_name.clone()),
732                    id: Some(u.owner.id.clone()),
733                }),
734                key: Some(u.key.clone()),
735                owner: Some(to_model_owner(&u.owner)),
736                storage_class: Some(StorageClass::from(u.storage_class.as_str())),
737                upload_id: Some(u.upload_id.clone()),
738            })
739            .collect();
740
741        let next_key_marker = if is_truncated {
742            s3_uploads.last().and_then(|u| u.key.clone())
743        } else {
744            None
745        };
746        let next_upload_id_marker = if is_truncated {
747            s3_uploads.last().and_then(|u| u.upload_id.clone())
748        } else {
749            None
750        };
751
752        Ok(ListMultipartUploadsOutput {
753            bucket: Some(bucket_name),
754            common_prefixes: Vec::new(),
755            delimiter: input.delimiter,
756            encoding_type: None,
757            is_truncated: Some(is_truncated),
758            key_marker: input.key_marker,
759            max_uploads: Some(max_uploads as i32),
760            next_key_marker,
761            next_upload_id_marker,
762            prefix: Some(prefix),
763            request_charged: None,
764            upload_id_marker: input.upload_id_marker,
765            uploads: s3_uploads,
766        })
767    }
768}
769
770/// Per-algorithm checksum output fields.
771type ChecksumOutputFields = (
772    Option<String>,
773    Option<String>,
774    Option<String>,
775    Option<String>,
776    Option<String>,
777);
778
779/// Extract a checksum from an [`UploadPartInput`] if any checksum fields are set.
780///
781/// Returns at most one checksum. If multiple checksum fields are set, returns
782/// an error.
783#[allow(clippy::result_large_err)]
784fn extract_checksum_from_part(input: &UploadPartInput) -> Result<Option<ChecksumData>, S3Error> {
785    let candidates: [(&str, &Option<String>); 5] = [
786        ("CRC32", &input.checksum_crc32),
787        ("CRC32C", &input.checksum_crc32c),
788        ("CRC64NVME", &input.checksum_crc64nvme),
789        ("SHA1", &input.checksum_sha1),
790        ("SHA256", &input.checksum_sha256),
791    ];
792    let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
793    if found.len() > 1 {
794        return Err(S3ServiceError::InvalidArgument {
795            message: "Only one checksum value can be provided per request".to_owned(),
796        }
797        .into_s3_error());
798    }
799    Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
800        algorithm: (*alg).to_owned(),
801        value: val.as_ref().unwrap_or(&String::new()).clone(),
802        checksum_type: "FULL_OBJECT".to_owned(),
803    }))
804}
805
806/// Map an optional [`ChecksumData`] to individual output fields for the five
807/// supported algorithms.
808fn checksum_to_output_fields(checksum: Option<&ChecksumData>) -> ChecksumOutputFields {
809    let Some(c) = checksum else {
810        return (None, None, None, None, None);
811    };
812    match c.algorithm.as_str() {
813        "CRC32" => (Some(c.value.clone()), None, None, None, None),
814        "CRC32C" => (None, Some(c.value.clone()), None, None, None),
815        "CRC64NVME" => (None, None, Some(c.value.clone()), None, None),
816        "SHA1" => (None, None, None, Some(c.value.clone()), None),
817        "SHA256" => (None, None, None, None, Some(c.value.clone())),
818        _ => (None, None, None, None, None),
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825
826    #[test]
827    fn test_should_parse_copy_source_basic() {
828        let (bucket, key, version) = parse_copy_source("/my-bucket/my-key").unwrap();
829        assert_eq!(bucket, "my-bucket");
830        assert_eq!(key, "my-key");
831        assert!(version.is_none());
832    }
833
834    #[test]
835    fn test_should_parse_copy_source_without_leading_slash() {
836        let (bucket, key, version) = parse_copy_source("my-bucket/my-key").unwrap();
837        assert_eq!(bucket, "my-bucket");
838        assert_eq!(key, "my-key");
839        assert!(version.is_none());
840    }
841
842    #[test]
843    fn test_should_parse_copy_source_with_version_id() {
844        let (bucket, key, version) =
845            parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
846        assert_eq!(bucket, "my-bucket");
847        assert_eq!(key, "my-key");
848        assert_eq!(version.as_deref(), Some("abc123"));
849    }
850
851    #[test]
852    fn test_should_parse_copy_source_with_nested_key() {
853        let (bucket, key, version) =
854            parse_copy_source("/my-bucket/path/to/my-key?versionId=v1").unwrap();
855        assert_eq!(bucket, "my-bucket");
856        assert_eq!(key, "path/to/my-key");
857        assert_eq!(version.as_deref(), Some("v1"));
858    }
859
860    #[test]
861    fn test_should_fail_on_invalid_copy_source() {
862        let result = parse_copy_source("no-slash");
863        assert!(result.is_err());
864    }
865
866    #[test]
867    fn test_should_fail_on_empty_bucket() {
868        let result = parse_copy_source("/");
869        assert!(result.is_err());
870    }
871}