Skip to main content

rustack_s3_core/ops/
multipart.rs

1//! Multipart upload operation handlers.
2//!
3//! Implements `create_multipart_upload`, `upload_part`, `upload_part_copy`,
4//! `complete_multipart_upload`, `abort_multipart_upload`, `list_parts`,
5//! and `list_multipart_uploads`.
6
7use std::str::FromStr;
8
9use chrono::Utc;
10use rustack_s3_model::{
11    error::{S3Error, S3ErrorCode},
12    input::{
13        AbortMultipartUploadInput, CompleteMultipartUploadInput, CreateMultipartUploadInput,
14        ListMultipartUploadsInput, ListPartsInput, UploadPartCopyInput, UploadPartInput,
15    },
16    output::{
17        AbortMultipartUploadOutput, CompleteMultipartUploadOutput, CreateMultipartUploadOutput,
18        ListMultipartUploadsOutput, ListPartsOutput, UploadPartCopyOutput, UploadPartOutput,
19    },
20    types::{
21        ChecksumAlgorithm, ChecksumType, CopyPartResult, Initiator,
22        MultipartUpload as ModelMultipartUpload, Part, StorageClass,
23    },
24};
25use tracing::debug;
26
27use crate::{
28    checksums::{
29        ChecksumAlgorithm as CoreChecksumAlgorithm, compute_checksum, compute_composite_checksum,
30    },
31    error::S3ServiceError,
32    provider::RustackS3,
33    state::{
34        multipart::{MultipartUpload, UploadPart},
35        object::{ChecksumData, ObjectMetadata, Owner as InternalOwner, S3Object},
36    },
37    utils::{generate_upload_id, parse_copy_source},
38    validation::{validate_content_md5, validate_object_key},
39};
40
41/// Minimum part size for multipart uploads (5 MB). All parts except the last
42/// must be at least this size per the S3 specification.
43const MIN_PART_SIZE: u64 = 5 * 1024 * 1024;
44
45use super::bucket::to_model_owner;
46
47// AWS S3 DTOs use signed integers (i32/i64) for inherently non-negative values.
48// These handler methods must remain async for consistency.
49#[allow(
50    clippy::cast_possible_wrap,
51    clippy::cast_possible_truncation,
52    clippy::cast_sign_loss,
53    clippy::unused_async
54)]
55impl RustackS3 {
56    /// Create a new multipart upload.
57    #[allow(clippy::too_many_lines)]
58    pub async fn handle_create_multipart_upload(
59        &self,
60        input: CreateMultipartUploadInput,
61    ) -> Result<CreateMultipartUploadOutput, S3Error> {
62        let bucket_name = input.bucket;
63        let key = input.key;
64
65        validate_object_key(&key).map_err(S3ServiceError::into_s3_error)?;
66
67        let bucket = self
68            .state
69            .get_bucket(&bucket_name)
70            .map_err(S3ServiceError::into_s3_error)?;
71
72        let upload_id = generate_upload_id();
73
74        // Build metadata from the request.
75        let metadata = ObjectMetadata {
76            content_type: input.content_type.clone(),
77            content_encoding: input.content_encoding.clone(),
78            content_disposition: input.content_disposition.clone(),
79            content_language: input.content_language.clone(),
80            cache_control: input.cache_control.clone(),
81            expires: None,
82            user_metadata: input.metadata.clone(),
83            sse_algorithm: input
84                .server_side_encryption
85                .as_ref()
86                .map(|s| s.as_str().to_owned()),
87            sse_kms_key_id: input.ssekms_key_id.clone(),
88            sse_bucket_key_enabled: input.bucket_key_enabled,
89            sse_customer_algorithm: input.sse_customer_algorithm.clone(),
90            sse_customer_key_md5: input.sse_customer_key_md5.clone(),
91            tagging: input
92                .tagging
93                .as_ref()
94                .map(|t| super::object::parse_tagging_header(t))
95                .unwrap_or_default(),
96            acl: input
97                .acl
98                .as_ref()
99                .and_then(|a| a.as_str().parse().ok())
100                .unwrap_or_default(),
101            object_lock_mode: input
102                .object_lock_mode
103                .as_ref()
104                .map(|m| m.as_str().to_owned()),
105            object_lock_retain_until: input.object_lock_retain_until_date,
106            object_lock_legal_hold: input
107                .object_lock_legal_hold_status
108                .as_ref()
109                .map(|s| s.as_str() == "ON"),
110        };
111
112        let mut upload = MultipartUpload::new(
113            upload_id.clone(),
114            key.clone(),
115            InternalOwner::default(),
116            metadata,
117        );
118
119        upload.storage_class = input
120            .storage_class
121            .as_ref()
122            .map_or_else(|| "STANDARD".to_owned(), |s| s.as_str().to_owned());
123
124        upload.checksum_algorithm = input
125            .checksum_algorithm
126            .as_ref()
127            .map(|a| a.as_str().to_owned());
128
129        // Determine checksum type: CRC64NVME forces FULL_OBJECT, SHA-1/SHA-256
130        // force COMPOSITE, CRC32/CRC32C default to COMPOSITE but allow override.
131        if let Some(ref algo_str) = upload.checksum_algorithm {
132            let requested_type = input
133                .checksum_type
134                .as_ref()
135                .map(|ct| ct.as_str().to_owned());
136            upload.checksum_type = Some(match algo_str.as_str() {
137                "CRC64NVME" => "FULL_OBJECT".to_owned(),
138                "SHA1" | "SHA256" => "COMPOSITE".to_owned(),
139                _ => requested_type.unwrap_or_else(|| "COMPOSITE".to_owned()),
140            });
141        }
142
143        upload.sse_algorithm = input
144            .server_side_encryption
145            .as_ref()
146            .map(|s| s.as_str().to_owned());
147
148        upload.sse_kms_key_id.clone_from(&input.ssekms_key_id);
149
150        let output_checksum_type = upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
151            "FULL_OBJECT" => ChecksumType::FullObject,
152            _ => ChecksumType::Composite,
153        });
154
155        bucket.multipart_uploads.insert(upload_id.clone(), upload);
156
157        debug!(
158            bucket = %bucket_name,
159            key = %key,
160            upload_id = %upload_id,
161            "create_multipart_upload completed"
162        );
163
164        Ok(CreateMultipartUploadOutput {
165            abort_date: None,
166            abort_rule_id: None,
167            bucket: Some(bucket_name),
168            bucket_key_enabled: None,
169            checksum_algorithm: input.checksum_algorithm,
170            checksum_type: output_checksum_type,
171            key: Some(key),
172            request_charged: None,
173            sse_customer_algorithm: None,
174            sse_customer_key_md5: None,
175            ssekms_encryption_context: None,
176            ssekms_key_id: None,
177            server_side_encryption: None,
178            upload_id: Some(upload_id),
179        })
180    }
181
182    /// Upload a single part of a multipart upload.
183    #[allow(clippy::too_many_lines)]
184    pub async fn handle_upload_part(
185        &self,
186        mut input: UploadPartInput,
187    ) -> Result<UploadPartOutput, S3Error> {
188        // Extract checksum before moving fields out of input.
189        let part_checksum = extract_checksum_from_part(&input)?;
190
191        let bucket_name = input.bucket;
192        let key = input.key;
193        let upload_id = input.upload_id;
194        let part_number = input.part_number;
195
196        if !(1..=10_000).contains(&part_number) {
197            return Err(S3Error::invalid_argument(
198                "Part number must be between 1 and 10000",
199            ));
200        }
201
202        let bucket = self
203            .state
204            .get_bucket(&bucket_name)
205            .map_err(S3ServiceError::into_s3_error)?;
206
207        // Verify the upload exists and get its checksum algorithm.
208        let upload_checksum_algorithm = {
209            let upload_ref = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
210                S3ServiceError::NoSuchUpload {
211                    upload_id: upload_id.clone(),
212                }
213                .into_s3_error()
214            })?;
215            upload_ref.checksum_algorithm.clone()
216        };
217
218        // Collect body data.
219        let body_data = input.body.take().map(|b| b.data).unwrap_or_default();
220
221        // Validate Content-MD5 if provided.
222        validate_content_md5(input.content_md5.as_deref(), &body_data)
223            .map_err(S3ServiceError::into_s3_error)?;
224
225        // If the multipart upload has a checksum algorithm, validate the part
226        // checksum and compute server-side if not provided.
227        let checksum = if let Some(ref algo_str) = upload_checksum_algorithm {
228            if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
229                let computed = compute_checksum(algo, &body_data);
230                if let Some(ref client_cksum) = part_checksum {
231                    // Validate algorithm matches.
232                    if !client_cksum.algorithm.eq_ignore_ascii_case(algo_str) {
233                        return Err(S3ServiceError::InvalidArgument {
234                            message: format!(
235                                "Checksum algorithm mismatch: expected {algo_str}, got {}",
236                                client_cksum.algorithm
237                            ),
238                        }
239                        .into_s3_error());
240                    }
241                    // Validate value.
242                    if client_cksum.value != computed {
243                        return Err(S3ServiceError::BadDigest.into_s3_error());
244                    }
245                }
246                Some(ChecksumData {
247                    algorithm: algo_str.clone(),
248                    value: computed,
249                    checksum_type: "FULL_OBJECT".to_owned(),
250                })
251            } else {
252                part_checksum
253            }
254        } else {
255            part_checksum
256        };
257
258        // Write part to storage.
259        let write_result = self
260            .storage
261            .write_part(&bucket_name, &upload_id, part_number as u32, body_data)
262            .await
263            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
264
265        // Build checksum output fields.
266        let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
267            checksum_to_output_fields(checksum.as_ref());
268
269        // Record the part metadata.
270        let part = UploadPart {
271            part_number: part_number as u32,
272            etag: write_result.etag.clone(),
273            size: write_result.size,
274            last_modified: Utc::now(),
275            checksum,
276        };
277
278        if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
279            upload.put_part(part);
280        }
281
282        debug!(
283            bucket = %bucket_name,
284            key = %key,
285            upload_id = %upload_id,
286            part_number,
287            "upload_part completed"
288        );
289
290        Ok(UploadPartOutput {
291            bucket_key_enabled: None,
292            checksum_crc32: out_crc32,
293            checksum_crc32c: out_crc32c,
294            checksum_crc64nvme: out_crc64nvme,
295            checksum_sha1: out_sha1,
296            checksum_sha256: out_sha256,
297            e_tag: Some(write_result.etag),
298            request_charged: None,
299            sse_customer_algorithm: None,
300            sse_customer_key_md5: None,
301            ssekms_key_id: None,
302            server_side_encryption: None,
303        })
304    }
305
306    /// Upload a part by copying from an existing object.
307    pub async fn handle_upload_part_copy(
308        &self,
309        input: UploadPartCopyInput,
310    ) -> Result<UploadPartCopyOutput, S3Error> {
311        let bucket_name = input.bucket;
312        let upload_id = input.upload_id;
313        let part_number = input.part_number;
314
315        let (src_bucket, src_key, src_version_id) =
316            parse_copy_source(&input.copy_source).map_err(S3ServiceError::into_s3_error)?;
317
318        // Read source object data.
319        let src_vid = src_version_id.as_deref().unwrap_or("null");
320        let data = self
321            .storage
322            .read_object(&src_bucket, &src_key, src_vid, None)
323            .await
324            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
325
326        // Write as part.
327        let write_result = self
328            .storage
329            .write_part(&bucket_name, &upload_id, part_number as u32, data)
330            .await
331            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
332
333        // Record the part metadata.
334        let bucket = self
335            .state
336            .get_bucket(&bucket_name)
337            .map_err(S3ServiceError::into_s3_error)?;
338
339        let part = UploadPart {
340            part_number: part_number as u32,
341            etag: write_result.etag.clone(),
342            size: write_result.size,
343            last_modified: Utc::now(),
344            checksum: None,
345        };
346
347        if let Some(mut upload) = bucket.multipart_uploads.get_mut(&upload_id) {
348            upload.put_part(part);
349        }
350
351        let copy_result = CopyPartResult {
352            checksum_crc32: None,
353            checksum_crc32c: None,
354            checksum_crc64nvme: None,
355            checksum_sha1: None,
356            checksum_sha256: None,
357            e_tag: Some(write_result.etag),
358            last_modified: Some(Utc::now()),
359        };
360
361        Ok(UploadPartCopyOutput {
362            bucket_key_enabled: None,
363            copy_part_result: Some(copy_result),
364            copy_source_version_id: src_version_id,
365            request_charged: None,
366            sse_customer_algorithm: None,
367            sse_customer_key_md5: None,
368            ssekms_key_id: None,
369            server_side_encryption: None,
370        })
371    }
372
373    /// Complete a multipart upload by assembling parts into the final object.
374    #[allow(clippy::too_many_lines)]
375    pub async fn handle_complete_multipart_upload(
376        &self,
377        input: CompleteMultipartUploadInput,
378    ) -> Result<CompleteMultipartUploadOutput, S3Error> {
379        let bucket_name = input.bucket;
380        let key = input.key;
381        let upload_id = input.upload_id;
382
383        let bucket = self
384            .state
385            .get_bucket(&bucket_name)
386            .map_err(S3ServiceError::into_s3_error)?;
387
388        // Get the upload.
389        let upload = bucket
390            .multipart_uploads
391            .get(&upload_id)
392            .ok_or_else(|| {
393                S3ServiceError::NoSuchUpload {
394                    upload_id: upload_id.clone(),
395                }
396                .into_s3_error()
397            })?
398            .clone();
399
400        // Extract the requested part list.
401        let requested_parts = input
402            .multipart_upload
403            .map(|mu| mu.parts)
404            .unwrap_or_default();
405
406        // Validate parts are in order and exist.
407        let mut part_numbers: Vec<u32> = Vec::with_capacity(requested_parts.len());
408        let mut last_num = 0i32;
409
410        for cp in &requested_parts {
411            let part_num = cp.part_number.ok_or_else(|| {
412                S3Error::with_message(S3ErrorCode::InvalidArgument, "Part number is required")
413            })?;
414
415            if part_num <= last_num {
416                return Err(S3ServiceError::InvalidPartOrder.into_s3_error());
417            }
418            last_num = part_num;
419
420            let part_num_u32 = u32::try_from(part_num).map_err(|_| {
421                S3Error::with_message(S3ErrorCode::InvalidArgument, "Invalid part number")
422            })?;
423
424            // Verify the part exists in our upload record.
425            upload
426                .get_part(part_num_u32)
427                .ok_or_else(|| S3ServiceError::InvalidPart.into_s3_error())?;
428
429            part_numbers.push(part_num_u32);
430        }
431
432        // Validate minimum part size for all parts except the last one.
433        // S3 requires each part (except the final one) to be at least 5 MB.
434        if part_numbers.len() > 1 {
435            for &num in &part_numbers[..part_numbers.len() - 1] {
436                if let Some(part) = upload.get_part(num) {
437                    if part.size < MIN_PART_SIZE {
438                        return Err(S3Error::with_message(
439                            S3ErrorCode::EntityTooSmall,
440                            format!(
441                                "Your proposed upload is smaller than the minimum allowed size. \
442                                 Part {num} has size {} bytes, minimum is {MIN_PART_SIZE}",
443                                part.size
444                            ),
445                        ));
446                    }
447                }
448            }
449        }
450
451        // Determine version ID.
452        let version_id = if bucket.is_versioning_enabled() {
453            crate::utils::generate_version_id()
454        } else {
455            "null".to_owned()
456        };
457
458        // Assemble parts in storage.
459        let (write_result, _part_md5s) = self
460            .storage
461            .complete_multipart(&bucket_name, &upload_id, &key, &version_id, &part_numbers)
462            .await
463            .map_err(|e| S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error())?;
464
465        // Compute the combined checksum for the final object if the multipart
466        // upload was created with a checksum algorithm.
467        //
468        // FULL_OBJECT: the checksum of the concatenated object bytes (no `-N`
469        // suffix). AWS CLI verifies downloaded multipart objects by recomputing
470        // this over the body and comparing to the header value.
471        //
472        // COMPOSITE: the hash of concatenated per-part checksums, with `-N`
473        // suffix. Used for SHA-1/SHA-256 multipart uploads.
474        let final_checksum = if let Some(ref algo_str) = upload.checksum_algorithm {
475            if let Ok(algo) = CoreChecksumAlgorithm::from_str(algo_str) {
476                let checksum_type_str = upload.checksum_type.as_deref().unwrap_or("COMPOSITE");
477
478                let value = if checksum_type_str == "FULL_OBJECT" {
479                    let assembled = self
480                        .storage
481                        .read_object(&bucket_name, &key, &version_id, None)
482                        .await
483                        .map_err(|e| {
484                            S3ServiceError::Internal(anyhow::anyhow!("{e}")).into_s3_error()
485                        })?;
486                    compute_checksum(algo, &assembled)
487                } else {
488                    // Collect part checksums in order for composite calculation.
489                    let part_checksums: Vec<String> = part_numbers
490                        .iter()
491                        .filter_map(|&num| {
492                            upload
493                                .get_part(num)
494                                .and_then(|p| p.checksum.as_ref())
495                                .map(|c| c.value.clone())
496                        })
497                        .collect();
498                    compute_composite_checksum(algo, &part_checksums)
499                };
500
501                Some(ChecksumData {
502                    algorithm: algo_str.clone(),
503                    value,
504                    checksum_type: checksum_type_str.to_owned(),
505                })
506            } else {
507                None
508            }
509        } else {
510            None
511        };
512
513        let (out_crc32, out_crc32c, out_crc64nvme, out_sha1, out_sha256) =
514            checksum_to_output_fields(final_checksum.as_ref());
515
516        let out_checksum_type = final_checksum
517            .as_ref()
518            .map(|c| match c.checksum_type.as_str() {
519                "FULL_OBJECT" => ChecksumType::FullObject,
520                _ => ChecksumType::Composite,
521            });
522
523        // Build the final object.
524        let obj = S3Object {
525            key: key.clone(),
526            version_id: version_id.clone(),
527            etag: write_result.etag.clone(),
528            size: write_result.size,
529            last_modified: Utc::now(),
530            storage_class: upload.storage_class.clone(),
531            metadata: upload.metadata.clone(),
532            owner: upload.owner.clone(),
533            checksum: final_checksum,
534            parts_count: Some(part_numbers.len() as u32),
535            part_etags: requested_parts
536                .iter()
537                .filter_map(|p| p.e_tag.clone())
538                .collect(),
539        };
540
541        {
542            let mut store = bucket.objects.write();
543            store.put(obj);
544        }
545
546        // Remove the completed upload.
547        bucket.multipart_uploads.remove(&upload_id);
548
549        debug!(
550            bucket = %bucket_name,
551            key = %key,
552            upload_id = %upload_id,
553            parts = part_numbers.len(),
554            "complete_multipart_upload completed"
555        );
556
557        let real_version_id = if version_id == "null" {
558            None
559        } else {
560            Some(version_id)
561        };
562
563        Ok(CompleteMultipartUploadOutput {
564            bucket: Some(bucket_name.clone()),
565            bucket_key_enabled: None,
566            checksum_crc32: out_crc32,
567            checksum_crc32c: out_crc32c,
568            checksum_crc64nvme: out_crc64nvme,
569            checksum_sha1: out_sha1,
570            checksum_sha256: out_sha256,
571            checksum_type: out_checksum_type,
572            e_tag: Some(write_result.etag),
573            expiration: None,
574            key: Some(key),
575            location: Some(format!("http://s3.amazonaws.com/{bucket_name}")),
576            request_charged: None,
577            ssekms_key_id: None,
578            server_side_encryption: None,
579            version_id: real_version_id,
580        })
581    }
582
583    /// Abort a multipart upload.
584    pub async fn handle_abort_multipart_upload(
585        &self,
586        input: AbortMultipartUploadInput,
587    ) -> Result<AbortMultipartUploadOutput, S3Error> {
588        let bucket_name = input.bucket;
589        let upload_id = input.upload_id;
590
591        let bucket = self
592            .state
593            .get_bucket(&bucket_name)
594            .map_err(S3ServiceError::into_s3_error)?;
595
596        // Remove the upload metadata (idempotent: no error if already gone).
597        bucket.multipart_uploads.remove(&upload_id);
598
599        // Clean up storage parts.
600        self.storage.abort_multipart(&bucket_name, &upload_id);
601
602        debug!(
603            bucket = %bucket_name,
604            upload_id = %upload_id,
605            "abort_multipart_upload completed"
606        );
607
608        Ok(AbortMultipartUploadOutput {
609            request_charged: None,
610        })
611    }
612
613    /// List parts that have been uploaded for a multipart upload.
614    pub async fn handle_list_parts(
615        &self,
616        input: ListPartsInput,
617    ) -> Result<ListPartsOutput, S3Error> {
618        let bucket_name = input.bucket;
619        let key = input.key;
620        let upload_id = input.upload_id;
621
622        let bucket = self
623            .state
624            .get_bucket(&bucket_name)
625            .map_err(S3ServiceError::into_s3_error)?;
626
627        let upload = bucket.multipart_uploads.get(&upload_id).ok_or_else(|| {
628            S3ServiceError::NoSuchUpload {
629                upload_id: upload_id.clone(),
630            }
631            .into_s3_error()
632        })?;
633
634        let max_parts = input.max_parts.unwrap_or(1000) as usize;
635        let part_number_marker: u32 = input
636            .part_number_marker
637            .as_deref()
638            .and_then(|s| s.parse().ok())
639            .unwrap_or(0);
640
641        let all_parts: Vec<&UploadPart> = upload
642            .parts
643            .values()
644            .filter(|p| p.part_number > part_number_marker)
645            .collect();
646
647        let is_truncated = all_parts.len() > max_parts;
648        let parts_to_return = &all_parts[..all_parts.len().min(max_parts)];
649
650        let s3_parts: Vec<Part> = parts_to_return
651            .iter()
652            .map(|p| {
653                let (crc32, crc32c, crc64nvme, sha1, sha256) =
654                    checksum_to_output_fields(p.checksum.as_ref());
655                Part {
656                    checksum_crc32: crc32,
657                    checksum_crc32c: crc32c,
658                    checksum_crc64nvme: crc64nvme,
659                    checksum_sha1: sha1,
660                    checksum_sha256: sha256,
661                    e_tag: Some(p.etag.clone()),
662                    last_modified: Some(p.last_modified),
663                    part_number: Some(p.part_number as i32),
664                    size: Some(p.size as i64),
665                }
666            })
667            .collect();
668
669        let next_marker = if is_truncated {
670            s3_parts.last().and_then(|p| p.part_number)
671        } else {
672            None
673        };
674
675        let owner = to_model_owner(&upload.owner);
676
677        Ok(ListPartsOutput {
678            abort_date: None,
679            abort_rule_id: None,
680            bucket: Some(bucket_name),
681            checksum_algorithm: upload
682                .checksum_algorithm
683                .as_ref()
684                .map(|a| ChecksumAlgorithm::from(a.as_str())),
685            checksum_type: upload.checksum_type.as_ref().map(|ct| match ct.as_str() {
686                "FULL_OBJECT" => ChecksumType::FullObject,
687                _ => ChecksumType::Composite,
688            }),
689            initiator: Some(Initiator {
690                display_name: Some(upload.owner.display_name.clone()),
691                id: Some(upload.owner.id.clone()),
692            }),
693            is_truncated: Some(is_truncated),
694            key: Some(key),
695            max_parts: Some(max_parts as i32),
696            next_part_number_marker: next_marker.map(|n| n.to_string()),
697            owner: Some(owner),
698            part_number_marker: Some(part_number_marker.to_string()),
699            parts: s3_parts,
700            request_charged: None,
701            storage_class: Some(StorageClass::from(upload.storage_class.as_str())),
702            upload_id: Some(upload_id),
703        })
704    }
705
706    /// List in-progress multipart uploads for a bucket.
707    pub async fn handle_list_multipart_uploads(
708        &self,
709        input: ListMultipartUploadsInput,
710    ) -> Result<ListMultipartUploadsOutput, S3Error> {
711        let bucket_name = input.bucket;
712
713        let bucket = self
714            .state
715            .get_bucket(&bucket_name)
716            .map_err(S3ServiceError::into_s3_error)?;
717
718        let prefix = input.prefix.unwrap_or_default();
719        let max_uploads = input.max_uploads.unwrap_or(1000) as usize;
720
721        let mut uploads: Vec<MultipartUpload> = bucket
722            .multipart_uploads
723            .iter()
724            .filter(|entry| entry.key.starts_with(&prefix))
725            .map(|entry| entry.value().clone())
726            .collect();
727
728        // Sort by key then by initiated time.
729        uploads.sort_by(|a, b| a.key.cmp(&b.key).then(a.initiated.cmp(&b.initiated)));
730
731        let is_truncated = uploads.len() > max_uploads;
732        let uploads_to_return = &uploads[..uploads.len().min(max_uploads)];
733
734        let s3_uploads: Vec<ModelMultipartUpload> = uploads_to_return
735            .iter()
736            .map(|u| ModelMultipartUpload {
737                checksum_algorithm: u
738                    .checksum_algorithm
739                    .as_ref()
740                    .map(|a| ChecksumAlgorithm::from(a.as_str())),
741                checksum_type: None,
742                initiated: Some(u.initiated),
743                initiator: Some(Initiator {
744                    display_name: Some(u.owner.display_name.clone()),
745                    id: Some(u.owner.id.clone()),
746                }),
747                key: Some(u.key.clone()),
748                owner: Some(to_model_owner(&u.owner)),
749                storage_class: Some(StorageClass::from(u.storage_class.as_str())),
750                upload_id: Some(u.upload_id.clone()),
751            })
752            .collect();
753
754        let next_key_marker = if is_truncated {
755            s3_uploads.last().and_then(|u| u.key.clone())
756        } else {
757            None
758        };
759        let next_upload_id_marker = if is_truncated {
760            s3_uploads.last().and_then(|u| u.upload_id.clone())
761        } else {
762            None
763        };
764
765        Ok(ListMultipartUploadsOutput {
766            bucket: Some(bucket_name),
767            common_prefixes: Vec::new(),
768            delimiter: input.delimiter,
769            encoding_type: None,
770            is_truncated: Some(is_truncated),
771            key_marker: input.key_marker,
772            max_uploads: Some(max_uploads as i32),
773            next_key_marker,
774            next_upload_id_marker,
775            prefix: Some(prefix),
776            request_charged: None,
777            upload_id_marker: input.upload_id_marker,
778            uploads: s3_uploads,
779        })
780    }
781}
782
783/// Per-algorithm checksum output fields.
784type ChecksumOutputFields = (
785    Option<String>,
786    Option<String>,
787    Option<String>,
788    Option<String>,
789    Option<String>,
790);
791
792/// Extract a checksum from an [`UploadPartInput`] if any checksum fields are set.
793///
794/// Returns at most one checksum. If multiple checksum fields are set, returns
795/// an error.
796#[allow(clippy::result_large_err)]
797fn extract_checksum_from_part(input: &UploadPartInput) -> Result<Option<ChecksumData>, S3Error> {
798    let candidates: [(&str, &Option<String>); 5] = [
799        ("CRC32", &input.checksum_crc32),
800        ("CRC32C", &input.checksum_crc32c),
801        ("CRC64NVME", &input.checksum_crc64nvme),
802        ("SHA1", &input.checksum_sha1),
803        ("SHA256", &input.checksum_sha256),
804    ];
805    let found: Vec<_> = candidates.iter().filter(|(_, v)| v.is_some()).collect();
806    if found.len() > 1 {
807        return Err(S3ServiceError::InvalidArgument {
808            message: "Only one checksum value can be provided per request".to_owned(),
809        }
810        .into_s3_error());
811    }
812    Ok(found.into_iter().next().map(|(alg, val)| ChecksumData {
813        algorithm: (*alg).to_owned(),
814        value: val.as_ref().unwrap_or(&String::new()).clone(),
815        checksum_type: "FULL_OBJECT".to_owned(),
816    }))
817}
818
819/// Map an optional [`ChecksumData`] to individual output fields for the five
820/// supported algorithms.
821fn checksum_to_output_fields(checksum: Option<&ChecksumData>) -> ChecksumOutputFields {
822    let Some(c) = checksum else {
823        return (None, None, None, None, None);
824    };
825    match c.algorithm.as_str() {
826        "CRC32" => (Some(c.value.clone()), None, None, None, None),
827        "CRC32C" => (None, Some(c.value.clone()), None, None, None),
828        "CRC64NVME" => (None, None, Some(c.value.clone()), None, None),
829        "SHA1" => (None, None, None, Some(c.value.clone()), None),
830        "SHA256" => (None, None, None, None, Some(c.value.clone())),
831        _ => (None, None, None, None, None),
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838
839    #[test]
840    fn test_should_parse_copy_source_basic() {
841        let (bucket, key, version) = parse_copy_source("/my-bucket/my-key").unwrap();
842        assert_eq!(bucket, "my-bucket");
843        assert_eq!(key, "my-key");
844        assert!(version.is_none());
845    }
846
847    #[test]
848    fn test_should_parse_copy_source_without_leading_slash() {
849        let (bucket, key, version) = parse_copy_source("my-bucket/my-key").unwrap();
850        assert_eq!(bucket, "my-bucket");
851        assert_eq!(key, "my-key");
852        assert!(version.is_none());
853    }
854
855    #[test]
856    fn test_should_parse_copy_source_with_version_id() {
857        let (bucket, key, version) =
858            parse_copy_source("/my-bucket/my-key?versionId=abc123").unwrap();
859        assert_eq!(bucket, "my-bucket");
860        assert_eq!(key, "my-key");
861        assert_eq!(version.as_deref(), Some("abc123"));
862    }
863
864    #[test]
865    fn test_should_parse_copy_source_with_nested_key() {
866        let (bucket, key, version) =
867            parse_copy_source("/my-bucket/path/to/my-key?versionId=v1").unwrap();
868        assert_eq!(bucket, "my-bucket");
869        assert_eq!(key, "path/to/my-key");
870        assert_eq!(version.as_deref(), Some("v1"));
871    }
872
873    #[test]
874    fn test_should_fail_on_invalid_copy_source() {
875        let result = parse_copy_source("no-slash");
876        assert!(result.is_err());
877    }
878
879    #[test]
880    fn test_should_fail_on_empty_bucket() {
881        let result = parse_copy_source("/");
882        assert!(result.is_err());
883    }
884}