Skip to main content

s3sync/storage/s3/
upload_manager.rs

1use anyhow::{Context, Result, anyhow};
2use async_channel::Sender;
3use aws_sdk_s3::Client;
4use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput;
5use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
6use aws_sdk_s3::operation::get_object::GetObjectOutput;
7use aws_sdk_s3::operation::put_object::PutObjectOutput;
8use aws_sdk_s3::operation::put_object::builders::PutObjectOutputBuilder;
9use aws_sdk_s3::primitives::ByteStream;
10use aws_sdk_s3::primitives::{DateTime, DateTimeFormat};
11use aws_sdk_s3::types::{
12    ChecksumAlgorithm, ChecksumType, CompletedMultipartUpload, CompletedPart, MetadataDirective,
13    ObjectPart, RequestPayer, ServerSideEncryption, StorageClass, TaggingDirective,
14};
15use aws_smithy_types_convert::date_time::DateTimeExt;
16use base64::{Engine as _, engine::general_purpose};
17use chrono::SecondsFormat;
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Mutex};
21use tokio::io::AsyncReadExt;
22use tokio::task;
23use tokio::task::JoinHandle;
24use tracing::{debug, error, trace, warn};
25
26use crate::config::Config;
27use crate::storage;
28use crate::storage::e_tag_verify::{generate_e_tag_hash, is_multipart_upload_e_tag};
29use crate::storage::{
30    Storage, convert_copy_to_put_object_output, convert_copy_to_upload_part_output,
31    convert_to_buf_byte_stream_with_callback, get_range_from_content_range,
32    parse_range_header_string,
33};
34use crate::types::SyncStatistics::{ChecksumVerified, ETagVerified, SyncWarning};
35use crate::types::error::S3syncError;
36use crate::types::event_callback::{EventData, EventType};
37use crate::types::preprocess_callback::{UploadMetadata, is_callback_cancelled};
38use crate::types::token::PipelineCancellationToken;
39use crate::types::{
40    S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY, S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY, SyncStatistics,
41};
42
43const MISMATCH_WARNING_WITH_HELP: &str = "mismatch. object in the target storage may be corrupted. \
44 or the current multipart_threshold or multipart_chunksize may be different when uploading to the source. \
45 To suppress this warning, please add --disable-multipart-verify command line option. \
46 To resolve this issue, please add --auto-chunksize command line option(but extra API overheads).";
47
48pub struct MutipartEtags {
49    pub digest: Vec<u8>,
50    pub part_number: i32,
51}
52pub struct UploadManager {
53    client: Arc<Client>,
54    config: Config,
55    request_payer: Option<RequestPayer>,
56    cancellation_token: PipelineCancellationToken,
57    stats_sender: Sender<SyncStatistics>,
58    tagging: Option<String>,
59    object_parts: Option<Vec<ObjectPart>>,
60    concatnated_md5_hash: Vec<u8>,
61    express_onezone_storage: bool,
62    source: Storage,
63    source_key: String,
64    source_total_size: u64,
65    source_additional_checksum: Option<String>,
66    if_match: Option<String>,
67    if_none_match: Option<String>,
68    copy_source_if_match: Option<String>,
69    has_warning: Arc<AtomicBool>,
70}
71
72impl UploadManager {
73    #[allow(clippy::too_many_arguments)]
74    pub fn new(
75        client: Arc<Client>,
76        config: Config,
77        request_payer: Option<RequestPayer>,
78        cancellation_token: PipelineCancellationToken,
79        stats_sender: Sender<SyncStatistics>,
80        tagging: Option<String>,
81        object_parts: Option<Vec<ObjectPart>>,
82        express_onezone_storage: bool,
83        source: Storage,
84        source_key: String,
85        source_total_size: u64,
86        source_additional_checksum: Option<String>,
87        if_match: Option<String>,
88        if_none_match: Option<String>,
89        copy_source_if_match: Option<String>,
90        has_warning: Arc<AtomicBool>,
91    ) -> Self {
92        UploadManager {
93            client,
94            config,
95            request_payer,
96            cancellation_token,
97            stats_sender,
98            tagging,
99            object_parts,
100            concatnated_md5_hash: vec![],
101            express_onezone_storage,
102            source,
103            source_key,
104            source_total_size,
105            source_additional_checksum,
106            if_match,
107            if_none_match,
108            copy_source_if_match,
109            has_warning,
110        }
111    }
112
113    pub async fn upload(
114        &mut self,
115        bucket: &str,
116        key: &str,
117        mut get_object_output_first_chunk: GetObjectOutput,
118    ) -> Result<PutObjectOutput> {
119        get_object_output_first_chunk = self.modify_metadata(get_object_output_first_chunk);
120
121        if self.is_auto_chunksize_enabled() {
122            if is_multipart_upload_e_tag(
123                &get_object_output_first_chunk
124                    .e_tag()
125                    .map(|e_tag| e_tag.to_string()),
126            ) {
127                return self
128                    .upload_with_auto_chunksize(bucket, key, get_object_output_first_chunk)
129                    .await;
130            }
131
132            let first_chunk_size = self
133                .object_parts
134                .as_ref()
135                .unwrap()
136                .first()
137                .unwrap()
138                .size()
139                .unwrap();
140            if self.source_total_size != first_chunk_size as u64 {
141                return Err(anyhow!(format!(
142                    "source_total_size does not match the first object part size: \
143                     source_total_size = {}, first object part size = {}",
144                    self.source_total_size, first_chunk_size
145                )));
146            }
147
148            // If auto-chunksize is enabled but the ETag is not a multipart upload ETag, it should be a single part upload.
149            let put_object_output = self
150                .singlepart_upload(bucket, key, get_object_output_first_chunk)
151                .await?;
152            trace!(key = key, "{put_object_output:?}");
153            return Ok(put_object_output);
154        }
155
156        let put_object_output = if self
157            .config
158            .transfer_config
159            .is_multipart_upload_required(self.source_total_size)
160        {
161            self.multipart_upload(bucket, key, get_object_output_first_chunk)
162                .await?
163        } else {
164            self.singlepart_upload(bucket, key, get_object_output_first_chunk)
165                .await?
166        };
167
168        trace!(key = key, "{put_object_output:?}");
169        Ok(put_object_output)
170    }
171
172    fn modify_metadata(&self, mut get_object_output: GetObjectOutput) -> GetObjectOutput {
173        if self.config.no_sync_system_metadata {
174            get_object_output = Self::clear_system_meta_data(get_object_output);
175        }
176
177        if self.config.no_sync_user_defined_metadata {
178            get_object_output.metadata = None
179        }
180
181        if self.config.metadata.is_some() {
182            get_object_output.metadata = Some(self.config.metadata.as_ref().unwrap().clone());
183        }
184
185        if self.config.put_last_modified_metadata {
186            get_object_output = Self::modify_last_modified_metadata(get_object_output);
187        }
188
189        if self.config.enable_versioning {
190            get_object_output = Self::update_versioning_metadata(get_object_output);
191        }
192
193        get_object_output
194    }
195
196    fn modify_last_modified_metadata(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
197        let mut metadata = get_object_output.metadata().cloned().unwrap_or_default();
198        let last_modified = DateTime::from_millis(
199            get_object_output
200                .last_modified()
201                .unwrap()
202                .to_millis()
203                .unwrap(),
204        )
205        .to_chrono_utc()
206        .unwrap()
207        .to_rfc3339_opts(SecondsFormat::Secs, false);
208
209        metadata.insert(
210            S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY.to_string(),
211            last_modified,
212        );
213        get_object_output.metadata = Some(metadata);
214
215        get_object_output
216    }
217
218    fn clear_system_meta_data(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
219        get_object_output.content_disposition = None;
220        get_object_output.content_encoding = None;
221        get_object_output.content_language = None;
222        get_object_output.content_type = None;
223        get_object_output.cache_control = None;
224        get_object_output.expires_string = None;
225        get_object_output.website_redirect_location = None;
226
227        get_object_output
228    }
229
230    fn update_versioning_metadata(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
231        if get_object_output.version_id().is_none() {
232            return get_object_output;
233        }
234
235        let source_version_id = get_object_output.version_id().unwrap();
236
237        let mut metadata = get_object_output.metadata().cloned().unwrap_or_default();
238
239        let last_modified = DateTime::from_millis(
240            get_object_output
241                .last_modified()
242                .unwrap()
243                .to_millis()
244                .unwrap(),
245        )
246        .to_chrono_utc()
247        .unwrap()
248        .to_rfc3339_opts(SecondsFormat::Secs, false);
249
250        metadata.insert(
251            S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY.to_string(),
252            source_version_id.to_string(),
253        );
254        metadata.insert(
255            S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY.to_string(),
256            last_modified,
257        );
258
259        get_object_output.metadata = Some(metadata);
260
261        get_object_output
262    }
263
264    pub async fn upload_with_auto_chunksize(
265        &mut self,
266        bucket: &str,
267        key: &str,
268        get_object_output_first_chunk: GetObjectOutput,
269    ) -> Result<PutObjectOutput> {
270        if self.object_parts.as_ref().unwrap().is_empty() {
271            panic!("Illegal object_parts state");
272        }
273
274        let put_object_output = self
275            .multipart_upload(bucket, key, get_object_output_first_chunk)
276            .await?;
277
278        trace!(key = key, "{put_object_output:?}");
279
280        Ok(put_object_output)
281    }
282
283    async fn multipart_upload(
284        &mut self,
285        bucket: &str,
286        key: &str,
287        get_object_output_first_chunk: GetObjectOutput,
288    ) -> Result<PutObjectOutput> {
289        let storage_class = if self.config.storage_class.is_none() {
290            get_object_output_first_chunk.storage_class().cloned()
291        } else {
292            self.config.storage_class.clone()
293        };
294
295        let checksum_type = if self.config.full_object_checksum {
296            Some(ChecksumType::FullObject)
297        } else {
298            None
299        };
300
301        let mut upload_metadata = UploadMetadata {
302            acl: self.config.canned_acl.clone(),
303            cache_control: if self.config.cache_control.is_none() {
304                get_object_output_first_chunk
305                    .cache_control()
306                    .map(|value| value.to_string())
307            } else {
308                self.config.cache_control.clone()
309            },
310            content_disposition: if self.config.content_disposition.is_none() {
311                get_object_output_first_chunk
312                    .content_disposition()
313                    .map(|value| value.to_string())
314            } else {
315                self.config.content_disposition.clone()
316            },
317            content_encoding: if self.config.content_encoding.is_none() {
318                get_object_output_first_chunk
319                    .content_encoding()
320                    .map(|value| value.to_string())
321            } else {
322                self.config.content_encoding.clone()
323            },
324            content_language: if self.config.content_language.is_none() {
325                get_object_output_first_chunk
326                    .content_language()
327                    .map(|value| value.to_string())
328            } else {
329                self.config.content_language.clone()
330            },
331            content_type: if self.config.content_type.is_none() {
332                get_object_output_first_chunk
333                    .content_type()
334                    .map(|value| value.to_string())
335            } else {
336                self.config.content_type.clone()
337            },
338            expires: if self.config.expires.is_none() {
339                get_object_output_first_chunk
340                    .expires_string()
341                    .map(|expires_string| {
342                        DateTime::from_str(expires_string, DateTimeFormat::HttpDate).unwrap()
343                    })
344            } else {
345                Some(DateTime::from_str(
346                    &self.config.expires.unwrap().to_rfc3339(),
347                    DateTimeFormat::DateTimeWithOffset,
348                )?)
349            },
350            metadata: if self.config.metadata.is_none() {
351                get_object_output_first_chunk.metadata().cloned()
352            } else {
353                self.config.metadata.clone()
354            },
355            request_payer: self.request_payer.clone(),
356            storage_class: storage_class.clone(),
357            website_redirect_location: if self.config.website_redirect.is_none() {
358                get_object_output_first_chunk
359                    .website_redirect_location()
360                    .map(|value| value.to_string())
361            } else {
362                self.config.website_redirect.clone()
363            },
364            tagging: self.tagging.clone(),
365        };
366
367        let callback_result = self
368            .config
369            .preprocess_manager
370            .execute_preprocessing(key, &get_object_output_first_chunk, &mut upload_metadata)
371            .await;
372        if let Err(e) = callback_result {
373            if is_callback_cancelled(&e) {
374                debug!(
375                    key = key,
376                    "PreprocessCallback execute_preprocessing() cancelled. Skipping upload."
377                );
378                return Ok(PutObjectOutputBuilder::default().build());
379            } else {
380                return Err(e.context("PreprocessCallback before_upload() failed."));
381            }
382        }
383
384        let create_multipart_upload_output = self
385            .client
386            .create_multipart_upload()
387            .set_request_payer(upload_metadata.request_payer)
388            .set_storage_class(upload_metadata.storage_class)
389            .bucket(bucket)
390            .key(key)
391            .set_metadata(upload_metadata.metadata)
392            .set_tagging(upload_metadata.tagging)
393            .set_website_redirect_location(upload_metadata.website_redirect_location)
394            .set_content_type(upload_metadata.content_type)
395            .set_content_encoding(upload_metadata.content_encoding)
396            .set_cache_control(upload_metadata.cache_control)
397            .set_content_disposition(upload_metadata.content_disposition)
398            .set_content_language(upload_metadata.content_language)
399            .set_expires(upload_metadata.expires)
400            .set_server_side_encryption(self.config.sse.clone())
401            .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
402            .set_sse_customer_algorithm(self.config.target_sse_c.clone())
403            .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
404            .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
405            .set_acl(upload_metadata.acl)
406            .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
407            .set_checksum_type(checksum_type)
408            .send()
409            .await
410            .context("aws_sdk_s3::client::Client create_multipart_upload() failed.")?;
411        let upload_id = create_multipart_upload_output.upload_id().unwrap();
412
413        let upload_result = self
414            .upload_parts_and_complete(bucket, key, upload_id, get_object_output_first_chunk)
415            .await
416            .context("upload_parts() failed.");
417        if upload_result.is_err() {
418            self.abort_multipart_upload(bucket, key, upload_id).await?;
419            return Err(upload_result.err().unwrap());
420        }
421
422        upload_result
423    }
424
425    #[rustfmt::skip] // For coverage tool incorrectness
426    async fn abort_multipart_upload(&self, bucket: &str, key: &str, upload_id: &str) -> Result<AbortMultipartUploadOutput> {
427        self.client.abort_multipart_upload().set_request_payer(self.request_payer.clone()).bucket(bucket).key(key).upload_id(upload_id).send().await.context("aws_sdk_s3::client::Client abort_multipart_upload() failed.")
428    }
429
430    async fn upload_parts_and_complete(
431        &mut self,
432        bucket: &str,
433        key: &str,
434        upload_id: &str,
435        get_object_output_first_chunk: GetObjectOutput,
436    ) -> Result<PutObjectOutput> {
437        let source_sse = get_object_output_first_chunk
438            .server_side_encryption()
439            .cloned();
440        let source_remote_storage = get_object_output_first_chunk.e_tag().is_some();
441        let source_content_length = self.source_total_size;
442        let source_e_tag = get_object_output_first_chunk
443            .e_tag()
444            .map(|e_tag| e_tag.to_string());
445        let source_local_storage = source_e_tag.is_none();
446        let source_checksum = self.source_additional_checksum.clone();
447        let source_storage_class = get_object_output_first_chunk.storage_class().cloned();
448        let source_version_id = get_object_output_first_chunk
449            .version_id()
450            .map(|v| v.to_string());
451        let source_last_modified = get_object_output_first_chunk.last_modified().copied();
452
453        let upload_parts = if self.is_auto_chunksize_enabled() {
454            self.upload_parts_with_auto_chunksize(
455                bucket,
456                key,
457                upload_id,
458                get_object_output_first_chunk,
459            )
460            .await
461            .context("upload_parts_with_auto_chunksize() failed.")?
462        } else {
463            self.upload_parts(bucket, key, upload_id, get_object_output_first_chunk)
464                .await
465                .context("upload_parts() failed.")?
466        };
467
468        let checksum_type = if self.config.full_object_checksum {
469            Some(ChecksumType::FullObject)
470        } else {
471            None
472        };
473
474        let completed_multipart_upload = CompletedMultipartUpload::builder()
475            .set_parts(Some(upload_parts))
476            .build();
477
478        let complete_multipart_upload_output = self
479            .client
480            .complete_multipart_upload()
481            .set_request_payer(self.request_payer.clone())
482            .bucket(bucket)
483            .key(key)
484            .upload_id(upload_id)
485            .multipart_upload(completed_multipart_upload)
486            .set_sse_customer_algorithm(self.config.target_sse_c.clone())
487            .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
488            .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
489            .set_checksum_type(checksum_type)
490            .set_if_match(self.if_match.clone())
491            .set_if_none_match(self.if_none_match.clone())
492            .send()
493            .await
494            .context("aws_sdk_s3::client::Client complete_multipart_upload() failed.")?;
495
496        trace!(
497            key = key,
498            upload_id = upload_id,
499            if_match = self.if_match.clone(),
500            "{complete_multipart_upload_output:?}"
501        );
502
503        let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
504        event_data.key = Some(key.to_string());
505        // skipcq: RS-W1070
506        event_data.source_version_id = source_version_id.clone();
507        event_data.target_version_id = complete_multipart_upload_output
508            .version_id
509            .clone()
510            .map(|v| v.to_string());
511        // skipcq: RS-W1070
512        event_data.source_etag = source_e_tag.clone();
513        event_data.source_last_modified = source_last_modified;
514        event_data.source_size = Some(self.source_total_size);
515        event_data.target_size = Some(self.source_total_size); // Assuming the size is the same as source
516        self.config.event_manager.trigger_event(event_data).await;
517
518        let source_e_tag = if source_local_storage {
519            Some(self.generate_e_tag_hash(self.calculate_parts_count(source_content_length as i64)))
520        } else {
521            source_e_tag
522        };
523
524        let mut target_e_tag = None;
525        if !self.config.disable_etag_verify
526            && !self.express_onezone_storage
527            && !self.config.disable_content_md5_header
528            && source_storage_class != Some(StorageClass::ExpressOnezone)
529        {
530            let target_sse = complete_multipart_upload_output
531                .server_side_encryption()
532                .cloned();
533            target_e_tag = complete_multipart_upload_output
534                .e_tag()
535                .map(|e| e.to_string());
536
537            self.verify_e_tag(
538                key,
539                &source_sse,
540                source_remote_storage,
541                &source_e_tag,
542                &target_sse,
543                &target_e_tag,
544                source_version_id.clone(),
545                complete_multipart_upload_output
546                    .version_id
547                    .clone()
548                    .map(|v| v.to_string()),
549                source_last_modified,
550                self.source_total_size,
551                self.source_total_size, // Assuming the size is the same as source
552            )
553            .await;
554        }
555
556        if !self.config.disable_additional_checksum_verify {
557            let target_checksum = get_additional_checksum_from_multipart_upload_result(
558                &complete_multipart_upload_output,
559                self.config.additional_checksum_algorithm.clone(),
560            );
561
562            self.validate_checksum(
563                key,
564                source_checksum,
565                target_checksum,
566                &source_e_tag,
567                &target_e_tag,
568                source_remote_storage,
569                source_version_id,
570                complete_multipart_upload_output
571                    .version_id
572                    .clone()
573                    .map(|v| v.to_string()),
574                source_last_modified,
575                self.source_total_size,
576                self.source_total_size, // Assuming the size is the same as source
577            )
578            .await;
579        }
580
581        Ok(PutObjectOutput::builder()
582            .e_tag(complete_multipart_upload_output.e_tag().unwrap())
583            .build())
584    }
585
586    #[allow(clippy::too_many_arguments)]
587    async fn verify_e_tag(
588        &mut self,
589        key: &str,
590        source_sse: &Option<ServerSideEncryption>,
591        source_remote_storage: bool,
592        source_e_tag: &Option<String>,
593        target_sse: &Option<ServerSideEncryption>,
594        target_e_tag: &Option<String>,
595        source_version_id: Option<String>,
596        target_version_id: Option<String>,
597        source_last_modified: Option<DateTime>,
598        source_content_length: u64,
599        target_content_length: u64,
600    ) {
601        let verify_result = storage::e_tag_verify::verify_e_tag(
602            !self.config.disable_multipart_verify,
603            &self.config.source_sse_c,
604            &self.config.target_sse_c,
605            source_sse,
606            source_e_tag,
607            target_sse,
608            target_e_tag,
609        );
610
611        let mut event_data = EventData::new(EventType::UNDEFINED);
612        event_data.key = Some(key.to_string());
613        event_data.source_version_id = source_version_id;
614        event_data.target_version_id = target_version_id;
615        // skipcq: RS-W1070
616        event_data.source_etag = source_e_tag.clone();
617        // skipcq: RS-W1070
618        event_data.target_etag = target_e_tag.clone();
619        event_data.source_last_modified = source_last_modified;
620        event_data.source_size = Some(source_content_length);
621        event_data.target_size = Some(target_content_length);
622
623        if let Some(e_tag_match) = verify_result {
624            if !e_tag_match {
625                if source_remote_storage
626                    && is_multipart_upload_e_tag(source_e_tag)
627                    && self.config.disable_multipart_verify
628                {
629                    debug!(
630                        key = &key,
631                        source_e_tag = source_e_tag,
632                        target_e_tag = target_e_tag,
633                        "skip e_tag verification"
634                    );
635                } else {
636                    let message = if source_remote_storage
637                        && is_multipart_upload_e_tag(source_e_tag)
638                        && !self.is_auto_chunksize_enabled()
639                    {
640                        format!("{} {}", "e_tag", MISMATCH_WARNING_WITH_HELP)
641                    } else {
642                        "e_tag mismatch. file in the target storage may be corrupted.".to_string()
643                    };
644
645                    self.send_stats(SyncWarning {
646                        key: key.to_string(),
647                    })
648                    .await;
649                    self.has_warning.store(true, Ordering::SeqCst);
650
651                    event_data.event_type = EventType::SYNC_ETAG_MISMATCH;
652                    self.config.event_manager.trigger_event(event_data).await;
653
654                    let source_e_tag = source_e_tag.clone().unwrap();
655                    let target_e_tag = target_e_tag.clone().unwrap();
656
657                    warn!(
658                        key = &key,
659                        source_e_tag = source_e_tag,
660                        target_e_tag = target_e_tag,
661                        message
662                    );
663                }
664            } else {
665                self.send_stats(ETagVerified {
666                    key: key.to_string(),
667                })
668                .await;
669
670                event_data.event_type = EventType::SYNC_ETAG_VERIFIED;
671                self.config.event_manager.trigger_event(event_data).await;
672
673                debug!(
674                    key = &key,
675                    source_e_tag = source_e_tag,
676                    target_e_tag = target_e_tag,
677                    "e_tag verified."
678                );
679            }
680        }
681    }
682
683    // skipcq: RS-R1000
684    async fn upload_parts(
685        &mut self,
686        bucket: &str,
687        key: &str,
688        upload_id: &str,
689        get_object_output_first_chunk: GetObjectOutput,
690    ) -> Result<Vec<CompletedPart>> {
691        let shared_source_version_id = get_object_output_first_chunk
692            .version_id()
693            .map(|v| v.to_string());
694        let shared_multipart_etags = Arc::new(Mutex::new(Vec::new()));
695        let shared_upload_parts = Arc::new(Mutex::new(Vec::new()));
696        let shared_total_upload_size = Arc::new(Mutex::new(Vec::new()));
697
698        let first_chunk_size = get_object_output_first_chunk.content_length().unwrap();
699        let config_chunksize = self.config.transfer_config.multipart_chunksize as usize;
700        let source_total_size = self.source_total_size as usize;
701        let source_version_id = get_object_output_first_chunk
702            .version_id()
703            .map(|v| v.to_string());
704
705        let mut body = get_object_output_first_chunk.body.into_async_read();
706
707        let mut upload_parts_join_handles = FuturesUnordered::new();
708        let mut part_number = 1;
709        for offset in (0..self.source_total_size as usize).step_by(config_chunksize) {
710            if self.cancellation_token.is_cancelled() {
711                return Err(anyhow!(S3syncError::Cancelled));
712            }
713
714            let source = dyn_clone::clone_box(&*(self.source));
715            let source_key = self.source_key.clone();
716            let copy_source = if self.config.server_side_copy {
717                self.source
718                    .generate_copy_source_key(source_key.as_ref(), source_version_id.clone())
719            } else {
720                "".to_string()
721            };
722            let copy_source_if_match = self.copy_source_if_match.clone();
723            let source_version_id = shared_source_version_id.clone();
724            let source_sse_c = self.config.source_sse_c.clone();
725            let source_sse_c_key = self.config.source_sse_c_key.clone();
726            let source_sse_c_key_string = self.config.source_sse_c_key.clone().key.clone();
727            let source_sse_c_key_md5 = self.config.source_sse_c_key_md5.clone();
728
729            let target = dyn_clone::clone_box(&*(self.client));
730            let target_bucket = bucket.to_string();
731            let target_key = key.to_string();
732            let target_upload_id = upload_id.to_string();
733            let target_sse_c = self.config.target_sse_c.clone();
734            let target_sse_c_key = self.config.target_sse_c_key.clone().key.clone();
735            let target_sse_c_key_md5 = self.config.target_sse_c_key_md5.clone();
736
737            let chunksize = if offset + config_chunksize > source_total_size {
738                source_total_size - offset
739            } else {
740                config_chunksize
741            };
742
743            let upload_parts = Arc::clone(&shared_upload_parts);
744            let multipart_etags = Arc::clone(&shared_multipart_etags);
745            let total_upload_size = Arc::clone(&shared_total_upload_size);
746
747            let additional_checksum_mode = self.config.additional_checksum_mode.clone();
748            let additional_checksum_algorithm = self.config.additional_checksum_algorithm.clone();
749            let disable_payload_signing = self.config.disable_payload_signing;
750            let multipart_chunksize = self.config.transfer_config.multipart_chunksize;
751            let express_onezone_storage = self.express_onezone_storage;
752            let disable_content_md5_header = self.config.disable_content_md5_header;
753            let request_payer = self.request_payer.clone();
754            let server_side_copy = self.config.server_side_copy;
755
756            let stats_sender = self.stats_sender.clone();
757            let event_manager = self.config.event_manager.clone();
758
759            let mut buffer = if !server_side_copy {
760                let mut buffer = Vec::<u8>::with_capacity(multipart_chunksize as usize);
761                buffer.resize_with(chunksize, Default::default);
762                buffer
763            } else {
764                Vec::new() // For server-side copy, we do not need to read the body.
765            };
766
767            // For the first part, we read the data from the supplied body.
768            if part_number == 1 && !server_side_copy {
769                let result = body.read_exact(buffer.as_mut_slice()).await;
770                if let Err(e) = result {
771                    warn!(
772                        key = &source_key,
773                        part_number = part_number,
774                        "Failed to read data from the body: {e:?}"
775                    );
776                    return Err(anyhow!(S3syncError::DownloadForceRetryableError));
777                }
778            }
779
780            let permit = self
781                .config
782                .clone()
783                .target_client_config
784                .unwrap()
785                .parallel_upload_semaphore
786                .acquire_owned()
787                .await?;
788            let task: JoinHandle<Result<()>> = task::spawn(async move {
789                let _permit = permit; // Keep the semaphore permit in scope
790                let range = format!("bytes={}-{}", offset, offset + chunksize - 1);
791
792                debug!(
793                    key = &target_key,
794                    part_number = part_number,
795                    "upload_part() start. range = {range:?}",
796                );
797
798                let upload_size;
799                // If the part number is greater than 1, we need to get the object from the source storage.
800                if part_number > 1 {
801                    if !server_side_copy {
802                        let get_object_output = source
803                            .get_object(
804                                &source_key,
805                                source_version_id.clone(),
806                                additional_checksum_mode,
807                                Some(range.clone()),
808                                source_sse_c.clone(),
809                                source_sse_c_key,
810                                source_sse_c_key_md5.clone(),
811                            )
812                            .await
813                            .context("source.get_object() failed.")?;
814                        upload_size = get_object_output.content_length().unwrap();
815
816                        if get_object_output.content_range().is_none() {
817                            error!("get_object() returned no content range. This is unexpected.");
818                            return Err(anyhow!(
819                                "get_object() returned no content range. This is unexpected. key={}.",
820                                &target_key
821                            ));
822                        }
823                        let (request_start, request_end) = parse_range_header_string(&range)
824                            .context("failed to parse request range header")?;
825                        let (response_start, response_end) =
826                            get_range_from_content_range(&get_object_output)
827                                .context("get_object() returned no content range")?;
828                        if (request_start != response_start) || (request_end != response_end) {
829                            return Err(anyhow!(
830                                "get_object() returned unexpected content range. \
831                                expected: {}-{}, actual: {}-{}",
832                                request_start,
833                                request_end,
834                                response_start,
835                                response_end,
836                            ));
837                        }
838
839                        let mut body = convert_to_buf_byte_stream_with_callback(
840                            get_object_output.body.into_async_read(),
841                            source.get_stats_sender().clone(),
842                            source.get_rate_limit_bandwidth(),
843                            None,
844                            None,
845                        )
846                        .into_async_read();
847
848                        let result = body.read_exact(buffer.as_mut_slice()).await;
849                        if let Err(e) = result {
850                            warn!(
851                                key = &source_key,
852                                part_number = part_number,
853                                "Failed to read data from the body: {e:?}"
854                            );
855                            return Err(anyhow!(S3syncError::DownloadForceRetryableError));
856                        }
857                    } else {
858                        upload_size = chunksize as i64;
859                    }
860                } else {
861                    upload_size = first_chunk_size;
862                }
863
864                let md5_digest;
865                let md5_digest_base64 =
866                    if !express_onezone_storage && !disable_content_md5_header && !server_side_copy
867                    {
868                        let md5_digest_raw = md5::compute(&buffer);
869                        md5_digest = Some(md5_digest_raw);
870                        Some(general_purpose::STANDARD.encode(md5_digest_raw.as_slice()))
871                    } else {
872                        md5_digest = None;
873                        None
874                    };
875
876                let upload_part_output;
877                if !server_side_copy {
878                    let builder = target
879                        .upload_part()
880                        .set_request_payer(request_payer)
881                        .bucket(&target_bucket)
882                        .key(&target_key)
883                        .upload_id(target_upload_id.clone())
884                        .part_number(part_number)
885                        .set_content_md5(md5_digest_base64)
886                        .content_length(chunksize as i64)
887                        .set_checksum_algorithm(additional_checksum_algorithm)
888                        .set_sse_customer_algorithm(target_sse_c)
889                        .set_sse_customer_key(target_sse_c_key)
890                        .set_sse_customer_key_md5(target_sse_c_key_md5)
891                        .body(ByteStream::from(buffer));
892
893                    upload_part_output = if disable_payload_signing {
894                        builder
895                            .customize()
896                            .disable_payload_signing()
897                            .send()
898                            .await
899                            .context("aws_sdk_s3::client::Client upload_part() failed.")?
900                    } else {
901                        builder
902                            .send()
903                            .await
904                            .context("aws_sdk_s3::client::Client upload_part() failed.")?
905                    };
906
907                    debug!(
908                        key = &target_key,
909                        part_number = part_number,
910                        "upload_part() complete",
911                    );
912
913                    trace!(key = &target_key, "{upload_part_output:?}");
914
915                    #[allow(clippy::unnecessary_unwrap)]
916                    if md5_digest.is_some() {
917                        let mut locked_multipart_etags = multipart_etags.lock().unwrap();
918                        locked_multipart_etags.push(MutipartEtags {
919                            digest: md5_digest.as_ref().unwrap().as_slice().to_vec(),
920                            part_number,
921                        });
922                    }
923                } else {
924                    let upload_part_copy_output = target
925                        .upload_part_copy()
926                        .copy_source(copy_source)
927                        .set_request_payer(request_payer)
928                        .set_copy_source_range(Some(range))
929                        .bucket(&target_bucket)
930                        .key(&target_key)
931                        .upload_id(target_upload_id.clone())
932                        .part_number(part_number)
933                        .set_copy_source_sse_customer_algorithm(source_sse_c)
934                        .set_copy_source_sse_customer_key(source_sse_c_key_string)
935                        .set_copy_source_sse_customer_key_md5(source_sse_c_key_md5)
936                        .set_sse_customer_algorithm(target_sse_c)
937                        .set_sse_customer_key(target_sse_c_key)
938                        .set_sse_customer_key_md5(target_sse_c_key_md5)
939                        .set_copy_source_if_match(copy_source_if_match.clone())
940                        .send()
941                        .await?;
942
943                    debug!(
944                        key = &target_key,
945                        part_number = part_number,
946                        copy_source_if_match = copy_source_if_match,
947                        "upload_part_copy() complete",
948                    );
949
950                    trace!(key = &target_key, "{upload_part_copy_output:?}");
951
952                    let _ =
953                        stats_sender.send_blocking(SyncStatistics::SyncBytes(upload_size as u64));
954
955                    upload_part_output =
956                        convert_copy_to_upload_part_output(upload_part_copy_output);
957                }
958
959                let mut event_data = EventData::new(EventType::SYNC_WRITE);
960                event_data.key = Some(target_key.clone());
961                // skipcq: RS-W1070
962                event_data.source_version_id = source_version_id.clone();
963                event_data.source_size = Some(source_total_size as u64);
964                event_data.upload_id = Some(target_upload_id.clone());
965                event_data.part_number = Some(part_number as u64);
966                event_data.byte_written = Some(upload_size as u64);
967                event_manager.trigger_event(event_data).await;
968
969                let mut upload_size_vec = total_upload_size.lock().unwrap();
970                upload_size_vec.push(upload_size);
971
972                let mut locked_upload_parts = upload_parts.lock().unwrap();
973                locked_upload_parts.push(
974                    CompletedPart::builder()
975                        .e_tag(upload_part_output.e_tag().unwrap())
976                        .set_checksum_sha256(
977                            upload_part_output
978                                .checksum_sha256()
979                                .map(|digest| digest.to_string()),
980                        )
981                        .set_checksum_sha1(
982                            upload_part_output
983                                .checksum_sha1()
984                                .map(|digest| digest.to_string()),
985                        )
986                        .set_checksum_crc32(
987                            upload_part_output
988                                .checksum_crc32()
989                                .map(|digest| digest.to_string()),
990                        )
991                        .set_checksum_crc32_c(
992                            upload_part_output
993                                .checksum_crc32_c()
994                                .map(|digest| digest.to_string()),
995                        )
996                        .set_checksum_crc64_nvme(
997                            upload_part_output
998                                .checksum_crc64_nvme()
999                                .map(|digest| digest.to_string()),
1000                        )
1001                        .part_number(part_number)
1002                        .build(),
1003                );
1004
1005                trace!(
1006                    key = &target_key,
1007                    upload_id = &target_upload_id,
1008                    "{locked_upload_parts:?}"
1009                );
1010
1011                Ok(())
1012            });
1013
1014            upload_parts_join_handles.push(task);
1015
1016            part_number += 1;
1017        }
1018
1019        while let Some(result) = upload_parts_join_handles.next().await {
1020            result??;
1021            if self.cancellation_token.is_cancelled() {
1022                return Err(anyhow!(S3syncError::Cancelled));
1023            }
1024        }
1025
1026        let total_upload_size: i64 = shared_total_upload_size.lock().unwrap().iter().sum();
1027        if total_upload_size == self.source_total_size as i64 {
1028            debug!(
1029                key,
1030                total_upload_size, "multipart upload completed successfully."
1031            );
1032        } else {
1033            return Err(anyhow!(format!(
1034                "multipart upload size mismatch: key={key}, expected = {0}, actual {total_upload_size}",
1035                self.source_total_size
1036            )));
1037        }
1038
1039        // Etags are concatenated in the order of part number. Otherwise, ETag verification will fail.
1040        let mut locked_multipart_etags = shared_multipart_etags.lock().unwrap();
1041        locked_multipart_etags.sort_by_key(|e| e.part_number);
1042        for etag in locked_multipart_etags.iter() {
1043            self.concatnated_md5_hash.append(&mut etag.digest.clone());
1044        }
1045
1046        // CompletedParts must be sorted by part number. Otherwise, CompleteMultipartUpload will fail.
1047        let mut parts = shared_upload_parts.lock().unwrap().clone();
1048        parts.sort_by_key(|part| part.part_number.unwrap());
1049        Ok(parts)
1050    }
1051
1052    // skipcq: RS-R1000
1053    async fn upload_parts_with_auto_chunksize(
1054        &mut self,
1055        bucket: &str,
1056        key: &str,
1057        upload_id: &str,
1058        get_object_output_first_chunk: GetObjectOutput,
1059    ) -> Result<Vec<CompletedPart>> {
1060        let shared_source_version_id = get_object_output_first_chunk
1061            .version_id()
1062            .map(|v| v.to_string());
1063        let shared_multipart_etags = Arc::new(Mutex::new(Vec::new()));
1064        let shared_upload_parts = Arc::new(Mutex::new(Vec::new()));
1065        let shared_total_upload_size = Arc::new(Mutex::new(Vec::new()));
1066
1067        let source_version_id = get_object_output_first_chunk
1068            .version_id()
1069            .map(|v| v.to_string());
1070
1071        let first_chunk_size = get_object_output_first_chunk.content_length().unwrap();
1072        let mut body = get_object_output_first_chunk.body.into_async_read();
1073
1074        let mut upload_parts_join_handles = FuturesUnordered::new();
1075        let mut part_number = 1;
1076        let mut offset = 0;
1077
1078        while part_number <= self.object_parts.as_ref().unwrap().len() as i32 {
1079            if self.cancellation_token.is_cancelled() {
1080                return Err(anyhow!(S3syncError::Cancelled));
1081            }
1082
1083            let source = dyn_clone::clone_box(&*(self.source));
1084            let source_key = self.source_key.clone();
1085            let source_total_size = self.source_total_size as usize;
1086            let copy_source = if self.config.server_side_copy {
1087                self.source
1088                    .generate_copy_source_key(source_key.as_ref(), source_version_id.clone())
1089            } else {
1090                "".to_string()
1091            };
1092            let copy_source_if_match = self.copy_source_if_match.clone();
1093            let source_version_id = shared_source_version_id.clone();
1094            let source_sse_c = self.config.source_sse_c.clone();
1095            let source_sse_c_key = self.config.source_sse_c_key.clone();
1096            let source_sse_c_key_string = self.config.source_sse_c_key.clone().key.clone();
1097            let source_sse_c_key_md5 = self.config.source_sse_c_key_md5.clone();
1098
1099            let target = dyn_clone::clone_box(&*(self.client));
1100            let target_bucket = bucket.to_string();
1101            let target_key = key.to_string();
1102            let target_upload_id = upload_id.to_string();
1103            let target_sse_c = self.config.target_sse_c.clone();
1104            let target_sse_c_key = self.config.target_sse_c_key.clone().key.clone();
1105            let target_sse_c_key_md5 = self.config.target_sse_c_key_md5.clone();
1106
1107            let object_part_chunksize = self
1108                .object_parts
1109                .as_ref()
1110                .unwrap()
1111                .get(part_number as usize - 1)
1112                .unwrap()
1113                .size()
1114                .unwrap();
1115
1116            let upload_parts = Arc::clone(&shared_upload_parts);
1117            let multipart_etags = Arc::clone(&shared_multipart_etags);
1118            let total_upload_size = Arc::clone(&shared_total_upload_size);
1119
1120            let additional_checksum_mode = self.config.additional_checksum_mode.clone();
1121            let additional_checksum_algorithm = self.config.additional_checksum_algorithm.clone();
1122            let disable_payload_signing = self.config.disable_payload_signing;
1123            let express_onezone_storage = self.express_onezone_storage;
1124            let disable_content_md5_header = self.config.disable_content_md5_header;
1125            let request_payer = self.request_payer.clone();
1126            let server_side_copy = self.config.server_side_copy;
1127
1128            let stats_sender = self.stats_sender.clone();
1129            let event_manager = self.config.event_manager.clone();
1130
1131            let mut buffer = if !server_side_copy {
1132                let mut buffer = Vec::<u8>::with_capacity(object_part_chunksize as usize);
1133                buffer.resize_with(object_part_chunksize as usize, Default::default);
1134                buffer
1135            } else {
1136                Vec::new() // For server-side copy, we do not need to read the body.
1137            };
1138
1139            // For the first part, we read the data from the supplied body.
1140            if part_number == 1 && !server_side_copy {
1141                let result = body.read_exact(buffer.as_mut_slice()).await;
1142                if let Err(e) = result {
1143                    warn!(
1144                        key = &source_key,
1145                        part_number = part_number,
1146                        "Failed to read data from the body: {e:?}"
1147                    );
1148                    return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1149                }
1150            }
1151
1152            let permit = self
1153                .config
1154                .clone()
1155                .target_client_config
1156                .unwrap()
1157                .parallel_upload_semaphore
1158                .acquire_owned()
1159                .await?;
1160            let task: JoinHandle<Result<()>> = task::spawn(async move {
1161                let _permit = permit; // Keep the semaphore permit in scope
1162                let range = format!("bytes={}-{}", offset, offset + object_part_chunksize - 1);
1163
1164                debug!(
1165                    key = &target_key,
1166                    part_number = part_number,
1167                    "upload_part() start. range = {range:?}",
1168                );
1169
1170                let upload_size;
1171                // If the part number is greater than 1, we need to get the object from the source storage.
1172                if part_number > 1 {
1173                    if !server_side_copy {
1174                        let get_object_output = source
1175                            .get_object(
1176                                &source_key,
1177                                source_version_id.clone(),
1178                                additional_checksum_mode,
1179                                Some(range.clone()),
1180                                source_sse_c.clone(),
1181                                source_sse_c_key.clone(),
1182                                source_sse_c_key_md5.clone(),
1183                            )
1184                            .await
1185                            .context("source.get_object() failed.")?;
1186                        upload_size = get_object_output.content_length().unwrap();
1187
1188                        if get_object_output.content_range().is_none() {
1189                            error!(
1190                                "get_object() - auto-chunksize returned no content range. This is unexpected."
1191                            );
1192                            return Err(anyhow!(
1193                                "get_object() returned no content range. This is unexpected. key={}.",
1194                                &target_key
1195                            ));
1196                        }
1197                        let (request_start, request_end) = parse_range_header_string(&range)
1198                            .context("failed to parse request range header")?;
1199                        let (response_start, response_end) =
1200                            get_range_from_content_range(&get_object_output)
1201                                .context("get_object() returned no content range")?;
1202                        if (request_start != response_start) || (request_end != response_end) {
1203                            return Err(anyhow!(
1204                                "get_object() - auto-chunksize returned unexpected content range. \
1205                                expected: {}-{}, actual: {}-{}",
1206                                request_start,
1207                                request_end,
1208                                response_start,
1209                                response_end,
1210                            ));
1211                        }
1212
1213                        let mut body = convert_to_buf_byte_stream_with_callback(
1214                            get_object_output.body.into_async_read(),
1215                            source.get_stats_sender().clone(),
1216                            source.get_rate_limit_bandwidth(),
1217                            None,
1218                            None,
1219                        )
1220                        .into_async_read();
1221
1222                        let result = body.read_exact(buffer.as_mut_slice()).await;
1223                        if let Err(e) = result {
1224                            warn!(
1225                                key = &source_key,
1226                                part_number = part_number,
1227                                "Failed to read data from the body: {e:?}"
1228                            );
1229                            return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1230                        }
1231                    } else {
1232                        upload_size = object_part_chunksize;
1233                    }
1234                } else {
1235                    upload_size = first_chunk_size;
1236                }
1237
1238                let md5_digest;
1239                let md5_digest_base64 =
1240                    if !express_onezone_storage && !disable_content_md5_header && !server_side_copy
1241                    {
1242                        let md5_digest_raw = md5::compute(&buffer);
1243                        md5_digest = Some(md5_digest_raw);
1244                        Some(general_purpose::STANDARD.encode(md5_digest_raw.as_slice()))
1245                    } else {
1246                        md5_digest = None;
1247                        None
1248                    };
1249
1250                let upload_part_output;
1251                if !server_side_copy {
1252                    let builder = target
1253                        .upload_part()
1254                        .set_request_payer(request_payer)
1255                        .bucket(&target_bucket)
1256                        .key(&target_key)
1257                        .upload_id(target_upload_id.clone())
1258                        .part_number(part_number)
1259                        .set_content_md5(md5_digest_base64)
1260                        .content_length(object_part_chunksize)
1261                        .set_checksum_algorithm(additional_checksum_algorithm)
1262                        .set_sse_customer_algorithm(target_sse_c)
1263                        .set_sse_customer_key(target_sse_c_key)
1264                        .set_sse_customer_key_md5(target_sse_c_key_md5)
1265                        .body(ByteStream::from(buffer));
1266
1267                    upload_part_output = if disable_payload_signing {
1268                        builder
1269                            .customize()
1270                            .disable_payload_signing()
1271                            .send()
1272                            .await
1273                            .context("aws_sdk_s3::client::Client upload_part() failed.")?
1274                    } else {
1275                        builder
1276                            .send()
1277                            .await
1278                            .context("aws_sdk_s3::client::Client upload_part() failed.")?
1279                    };
1280
1281                    debug!(
1282                        key = &target_key,
1283                        part_number = part_number,
1284                        "upload_part() complete",
1285                    );
1286
1287                    trace!(key = &target_key, "{upload_part_output:?}");
1288
1289                    if md5_digest.is_some() {
1290                        let mut locked_multipart_etags = multipart_etags.lock().unwrap();
1291                        #[allow(clippy::unnecessary_unwrap)]
1292                        locked_multipart_etags.push(MutipartEtags {
1293                            digest: md5_digest.as_ref().unwrap().as_slice().to_vec(),
1294                            part_number,
1295                        });
1296                    }
1297                } else {
1298                    let upload_part_copy_output = target
1299                        .upload_part_copy()
1300                        .copy_source(copy_source)
1301                        .set_request_payer(request_payer)
1302                        .set_copy_source_range(Some(range))
1303                        .bucket(&target_bucket)
1304                        .key(&target_key)
1305                        .upload_id(target_upload_id.clone())
1306                        .part_number(part_number)
1307                        .set_copy_source_sse_customer_algorithm(source_sse_c)
1308                        .set_copy_source_sse_customer_key(source_sse_c_key_string)
1309                        .set_copy_source_sse_customer_key_md5(source_sse_c_key_md5)
1310                        .set_sse_customer_algorithm(target_sse_c)
1311                        .set_sse_customer_key(target_sse_c_key)
1312                        .set_sse_customer_key_md5(target_sse_c_key_md5)
1313                        .set_copy_source_if_match(copy_source_if_match.clone())
1314                        .send()
1315                        .await?;
1316
1317                    debug!(
1318                        key = &target_key,
1319                        part_number = part_number,
1320                        copy_source_if_match = copy_source_if_match,
1321                        "upload_part_copy() complete",
1322                    );
1323
1324                    trace!(key = &target_key, "{upload_part_copy_output:?}");
1325
1326                    let _ =
1327                        stats_sender.send_blocking(SyncStatistics::SyncBytes(upload_size as u64));
1328
1329                    upload_part_output =
1330                        convert_copy_to_upload_part_output(upload_part_copy_output);
1331                }
1332
1333                let mut event_data = EventData::new(EventType::SYNC_WRITE);
1334                event_data.key = Some(target_key.clone());
1335                // skipcq: RS-W1070
1336                event_data.source_version_id = source_version_id.clone();
1337                event_data.source_size = Some(source_total_size as u64);
1338                event_data.upload_id = Some(target_upload_id.clone());
1339                event_data.part_number = Some(part_number as u64);
1340                event_data.byte_written = Some(upload_size as u64);
1341                event_manager.trigger_event(event_data).await;
1342
1343                let mut locked_upload_parts = upload_parts.lock().unwrap();
1344                locked_upload_parts.push(
1345                    CompletedPart::builder()
1346                        .e_tag(upload_part_output.e_tag().unwrap())
1347                        .set_checksum_sha256(
1348                            upload_part_output
1349                                .checksum_sha256()
1350                                .map(|digest| digest.to_string()),
1351                        )
1352                        .set_checksum_sha1(
1353                            upload_part_output
1354                                .checksum_sha1()
1355                                .map(|digest| digest.to_string()),
1356                        )
1357                        .set_checksum_crc32(
1358                            upload_part_output
1359                                .checksum_crc32()
1360                                .map(|digest| digest.to_string()),
1361                        )
1362                        .set_checksum_crc32_c(
1363                            upload_part_output
1364                                .checksum_crc32_c()
1365                                .map(|digest| digest.to_string()),
1366                        )
1367                        .set_checksum_crc64_nvme(
1368                            upload_part_output
1369                                .checksum_crc64_nvme()
1370                                .map(|digest| digest.to_string()),
1371                        )
1372                        .part_number(part_number)
1373                        .build(),
1374                );
1375
1376                let mut upload_size_vec = total_upload_size.lock().unwrap();
1377                upload_size_vec.push(upload_size);
1378
1379                trace!(
1380                    key = &target_key,
1381                    upload_id = &target_upload_id,
1382                    "{locked_upload_parts:?}"
1383                );
1384
1385                Ok(())
1386            });
1387
1388            upload_parts_join_handles.push(task);
1389
1390            offset += object_part_chunksize;
1391            part_number += 1;
1392        }
1393
1394        while let Some(result) = upload_parts_join_handles.next().await {
1395            result??;
1396            if self.cancellation_token.is_cancelled() {
1397                return Err(anyhow!(S3syncError::Cancelled));
1398            }
1399        }
1400
1401        let total_upload_size: i64 = shared_total_upload_size.lock().unwrap().iter().sum();
1402        if total_upload_size == self.source_total_size as i64 {
1403            debug!(
1404                key,
1405                total_upload_size, "multipart upload(auto-chunksize) completed successfully."
1406            );
1407        } else {
1408            return Err(anyhow!(format!(
1409                "multipart upload(auto-chunksize) size mismatch: key={key}, expected = {0}, actual {total_upload_size}",
1410                self.source_total_size
1411            )));
1412        }
1413
1414        // Etags are concatenated in the order of part number. Otherwise, ETag verification will fail.
1415        let mut locked_multipart_etags = shared_multipart_etags.lock().unwrap();
1416        locked_multipart_etags.sort_by_key(|e| e.part_number);
1417        for etag in locked_multipart_etags.iter() {
1418            self.concatnated_md5_hash.append(&mut etag.digest.clone());
1419        }
1420
1421        // CompletedParts must be sorted by part number. Otherwise, CompleteMultipartUpload will fail.
1422        let mut parts = shared_upload_parts.lock().unwrap().clone();
1423        parts.sort_by_key(|part| part.part_number.unwrap());
1424        Ok(parts)
1425    }
1426
1427    // skipcq: RS-R1000
1428    async fn singlepart_upload(
1429        &mut self,
1430        bucket: &str,
1431        key: &str,
1432        mut get_object_output: GetObjectOutput,
1433    ) -> Result<PutObjectOutput> {
1434        let source_sse = get_object_output.server_side_encryption().cloned();
1435        let source_remote_storage = get_object_output.e_tag().is_some();
1436        let source_e_tag = get_object_output.e_tag().map(|e_tag| e_tag.to_string());
1437        let source_local_storage = source_e_tag.is_none();
1438        let source_checksum = self.source_additional_checksum.clone();
1439        let source_storage_class = get_object_output.storage_class().cloned();
1440        let source_version_id = get_object_output.version_id().map(|v| v.to_string());
1441        let source_last_modified = get_object_output.last_modified().copied();
1442
1443        let buffer = if !self.config.server_side_copy {
1444            let mut body = get_object_output.body.into_async_read();
1445            get_object_output.body = ByteStream::from_static(b"");
1446
1447            let mut buffer = Vec::<u8>::with_capacity(self.source_total_size as usize);
1448            buffer.resize_with(self.source_total_size as usize, Default::default);
1449
1450            let result = body.read_exact(buffer.as_mut_slice()).await;
1451            if let Err(e) = result {
1452                warn!(key = &key, "Failed to read data from the body: {e:?}");
1453                return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1454            }
1455
1456            buffer
1457        } else {
1458            Vec::new() // For server-side copy, we do not need to read the body.
1459        };
1460
1461        let md5_digest_base64 = if !self.express_onezone_storage
1462            && !self.config.disable_content_md5_header
1463            && !self.config.server_side_copy
1464        {
1465            let md5_digest = md5::compute(&buffer);
1466
1467            self.concatnated_md5_hash
1468                .append(&mut md5_digest.as_slice().to_vec());
1469
1470            Some(general_purpose::STANDARD.encode(md5_digest.as_slice()))
1471        } else {
1472            None
1473        };
1474
1475        let buffer_stream = ByteStream::from(buffer);
1476
1477        let storage_class = if self.config.storage_class.is_none() {
1478            get_object_output.storage_class().cloned()
1479        } else {
1480            self.config.storage_class.clone()
1481        };
1482
1483        let mut upload_metadata = UploadMetadata {
1484            acl: self.config.canned_acl.clone(),
1485            cache_control: if self.config.cache_control.is_none() {
1486                get_object_output
1487                    .cache_control()
1488                    .map(|value| value.to_string())
1489            } else {
1490                self.config.cache_control.clone()
1491            },
1492            content_disposition: if self.config.content_disposition.is_none() {
1493                get_object_output
1494                    .content_disposition()
1495                    .map(|value| value.to_string())
1496            } else {
1497                self.config.content_disposition.clone()
1498            },
1499            content_encoding: if self.config.content_encoding.is_none() {
1500                get_object_output
1501                    .content_encoding()
1502                    .map(|value| value.to_string())
1503            } else {
1504                self.config.content_encoding.clone()
1505            },
1506            content_language: if self.config.content_language.is_none() {
1507                get_object_output
1508                    .content_language()
1509                    .map(|value| value.to_string())
1510            } else {
1511                self.config.content_language.clone()
1512            },
1513            content_type: if self.config.content_type.is_none() {
1514                get_object_output
1515                    .content_type()
1516                    .map(|value| value.to_string())
1517            } else {
1518                self.config.content_type.clone()
1519            },
1520            expires: if self.config.expires.is_none() {
1521                get_object_output.expires_string().map(|expires_string| {
1522                    DateTime::from_str(expires_string, DateTimeFormat::HttpDate).unwrap()
1523                })
1524            } else {
1525                Some(DateTime::from_str(
1526                    &self.config.expires.unwrap().to_rfc3339(),
1527                    DateTimeFormat::DateTimeWithOffset,
1528                )?)
1529            },
1530            metadata: if self.config.metadata.is_none() {
1531                get_object_output.metadata().cloned()
1532            } else {
1533                self.config.metadata.clone()
1534            },
1535            request_payer: self.request_payer.clone(),
1536            storage_class: storage_class.clone(),
1537            website_redirect_location: if self.config.website_redirect.is_none() {
1538                get_object_output
1539                    .website_redirect_location()
1540                    .map(|value| value.to_string())
1541            } else {
1542                self.config.website_redirect.clone()
1543            },
1544            tagging: self.tagging.clone(),
1545        };
1546
1547        let callback_result = self
1548            .config
1549            .preprocess_manager
1550            .execute_preprocessing(key, &get_object_output, &mut upload_metadata)
1551            .await;
1552        if let Err(e) = callback_result {
1553            if is_callback_cancelled(&e) {
1554                debug!(
1555                    key = key,
1556                    "PreprocessCallback execute_preprocessing() cancelled. Skipping upload."
1557                );
1558                return Ok(PutObjectOutputBuilder::default().build());
1559            } else {
1560                return Err(e.context("PreprocessCallback before_upload() failed."));
1561            }
1562        }
1563
1564        let put_object_output = if self.config.server_side_copy {
1565            let copy_source = self
1566                .source
1567                .generate_copy_source_key(self.source_key.as_ref(), source_version_id.clone());
1568            let copy_object_output = self
1569                .client
1570                .copy_object()
1571                .copy_source(copy_source)
1572                .set_request_payer(upload_metadata.request_payer)
1573                .set_storage_class(upload_metadata.storage_class)
1574                .bucket(bucket)
1575                .key(key)
1576                .metadata_directive(MetadataDirective::Replace)
1577                .tagging_directive(TaggingDirective::Replace)
1578                .set_metadata(upload_metadata.metadata)
1579                .set_tagging(upload_metadata.tagging)
1580                .set_website_redirect_location(upload_metadata.website_redirect_location)
1581                .set_content_type(upload_metadata.content_type)
1582                .set_content_encoding(upload_metadata.content_encoding)
1583                .set_cache_control(upload_metadata.cache_control)
1584                .set_content_disposition(upload_metadata.content_disposition)
1585                .set_content_language(upload_metadata.content_language)
1586                .set_expires(upload_metadata.expires)
1587                .set_server_side_encryption(self.config.sse.clone())
1588                .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
1589                .set_sse_customer_algorithm(self.config.target_sse_c.clone())
1590                .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
1591                .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
1592                .set_copy_source_sse_customer_algorithm(self.config.source_sse_c.clone())
1593                .set_copy_source_sse_customer_key(self.config.source_sse_c_key.clone().key.clone())
1594                .set_copy_source_sse_customer_key_md5(self.config.source_sse_c_key_md5.clone())
1595                .set_acl(upload_metadata.acl)
1596                .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
1597                .set_copy_source_if_match(self.copy_source_if_match.clone())
1598                .set_if_none_match(self.if_none_match.clone())
1599                .send()
1600                .await?;
1601            let _ = self
1602                .stats_sender
1603                .send_blocking(SyncStatistics::SyncBytes(self.source_total_size));
1604            convert_copy_to_put_object_output(copy_object_output, self.source_total_size as i64)
1605        } else {
1606            let builder = self
1607                .client
1608                .put_object()
1609                .set_request_payer(upload_metadata.request_payer)
1610                .set_storage_class(upload_metadata.storage_class)
1611                .bucket(bucket)
1612                .key(key)
1613                .content_length(self.source_total_size as i64)
1614                .body(buffer_stream)
1615                .set_metadata(upload_metadata.metadata)
1616                .set_tagging(upload_metadata.tagging)
1617                .set_website_redirect_location(upload_metadata.website_redirect_location)
1618                .set_content_md5(md5_digest_base64)
1619                .set_content_type(upload_metadata.content_type)
1620                .set_content_encoding(upload_metadata.content_encoding)
1621                .set_cache_control(upload_metadata.cache_control)
1622                .set_content_disposition(upload_metadata.content_disposition)
1623                .set_content_language(upload_metadata.content_language)
1624                .set_expires(upload_metadata.expires)
1625                .set_server_side_encryption(self.config.sse.clone())
1626                .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
1627                .set_sse_customer_algorithm(self.config.target_sse_c.clone())
1628                .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
1629                .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
1630                .set_acl(upload_metadata.acl)
1631                .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
1632                .set_if_match(self.if_match.clone())
1633                .set_if_none_match(self.if_none_match.clone());
1634
1635            if self.config.disable_payload_signing {
1636                builder
1637                    .customize()
1638                    .disable_payload_signing()
1639                    .send()
1640                    .await
1641                    .context("aws_sdk_s3::client::Client put_object() failed.")?
1642            } else {
1643                builder
1644                    .send()
1645                    .await
1646                    .context("aws_sdk_s3::client::Client put_object() failed.")?
1647            }
1648        };
1649
1650        debug!(
1651            key = &key,
1652            if_match = &self.if_match.clone(),
1653            if_none_match = &self.if_none_match.clone(),
1654            copy_source_if_match = self.copy_source_if_match.clone(),
1655            "put_object() complete",
1656        );
1657
1658        let mut event_data = EventData::new(EventType::SYNC_WRITE);
1659        event_data.key = Some(key.to_string());
1660        // skipcq: RS-W1070
1661        event_data.source_version_id = source_version_id.clone();
1662        event_data.source_size = Some(self.source_total_size);
1663        event_data.byte_written = Some(self.source_total_size); // Assuming the size is the same as source
1664        self.config.event_manager.trigger_event(event_data).await;
1665
1666        let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
1667        event_data.key = Some(key.to_string());
1668        // skipcq: RS-W1070
1669        event_data.source_version_id = source_version_id.clone();
1670        event_data.target_version_id = put_object_output.version_id().map(|v| v.to_string());
1671        // skipcq: RS-W1070
1672        event_data.source_etag = source_e_tag.clone();
1673        event_data.source_last_modified = get_object_output.last_modified().copied();
1674        event_data.source_size = Some(self.source_total_size);
1675        event_data.target_size = Some(self.source_total_size); // Assuming the size is the same as source
1676        self.config.event_manager.trigger_event(event_data).await;
1677
1678        let source_e_tag = if source_local_storage {
1679            Some(self.generate_e_tag_hash(0))
1680        } else {
1681            source_e_tag
1682        };
1683
1684        let mut target_e_tag = None;
1685        if !self.config.disable_etag_verify
1686            && !self.express_onezone_storage
1687            && !self.config.disable_content_md5_header
1688            && source_storage_class != Some(StorageClass::ExpressOnezone)
1689        {
1690            let target_sse = put_object_output.server_side_encryption().cloned();
1691            target_e_tag = put_object_output.e_tag().map(|e| e.to_string());
1692
1693            self.verify_e_tag(
1694                key,
1695                &source_sse,
1696                source_remote_storage,
1697                &source_e_tag,
1698                &target_sse,
1699                &target_e_tag,
1700                source_version_id.clone(),
1701                put_object_output.version_id().map(|v| v.to_string()),
1702                source_last_modified,
1703                self.source_total_size,
1704                self.source_total_size, // Assuming the size is the same as source
1705            )
1706            .await;
1707        }
1708
1709        if !self.config.disable_additional_checksum_verify {
1710            let target_checksum = get_additional_checksum_from_put_object_result(
1711                &put_object_output,
1712                self.config.additional_checksum_algorithm.as_ref().cloned(),
1713            );
1714
1715            self.validate_checksum(
1716                key,
1717                source_checksum,
1718                target_checksum,
1719                &source_e_tag,
1720                &target_e_tag,
1721                source_remote_storage,
1722                source_version_id,
1723                put_object_output.version_id().map(|v| v.to_string()),
1724                source_last_modified,
1725                self.source_total_size,
1726                self.source_total_size, // Assuming the size is the same as source
1727            )
1728            .await;
1729        }
1730
1731        Ok(put_object_output)
1732    }
1733
1734    #[allow(clippy::too_many_arguments)]
1735    async fn validate_checksum(
1736        &mut self,
1737        key: &str,
1738        source_checksum: Option<String>,
1739        target_checksum: Option<String>,
1740        source_e_tag: &Option<String>,
1741        target_e_tag: &Option<String>,
1742        source_remote_storage: bool,
1743        source_version_id: Option<String>,
1744        target_version_id: Option<String>,
1745        source_last_modified: Option<DateTime>,
1746        source_content_length: u64,
1747        target_content_length: u64,
1748    ) {
1749        if self.config.additional_checksum_mode.is_some() && source_checksum.is_none() {
1750            self.send_stats(SyncWarning {
1751                key: key.to_string(),
1752            })
1753            .await;
1754            self.has_warning.store(true, Ordering::SeqCst);
1755
1756            let message = "additional checksum algorithm is different from the target storage. skip additional checksum verification.";
1757            warn!(key = &key, message);
1758
1759            let mut event_data = EventData::new(EventType::SYNC_WARNING);
1760            event_data.key = Some(key.to_string());
1761            // skipcq: RS-W1070
1762            event_data.source_version_id = source_version_id.clone();
1763            // skipcq: RS-W1070
1764            event_data.target_version_id = target_version_id.clone();
1765            event_data.source_last_modified = source_last_modified;
1766            // skipcq: RS-W1070
1767            event_data.source_etag = source_e_tag.clone();
1768            event_data.source_size = Some(source_content_length);
1769            // skipcq: RS-W1070
1770            event_data.target_etag = target_e_tag.clone();
1771            event_data.target_size = Some(target_content_length);
1772            event_data.message = Some(message.to_string());
1773            self.config.event_manager.trigger_event(event_data).await;
1774        }
1775
1776        #[allow(clippy::unnecessary_unwrap)]
1777        if target_checksum.is_some() && source_checksum.is_some() {
1778            let target_checksum = target_checksum.unwrap();
1779            let source_checksum = source_checksum.unwrap();
1780
1781            let additional_checksum_algorithm = self
1782                .config
1783                .additional_checksum_algorithm
1784                .as_ref()
1785                .unwrap()
1786                .as_str();
1787
1788            let mut event_data = EventData::new(EventType::UNDEFINED);
1789            event_data.key = Some(key.to_string());
1790            event_data.source_version_id = source_version_id;
1791            event_data.target_version_id = target_version_id;
1792            event_data.source_last_modified = source_last_modified;
1793            // skipcq: RS-W1070
1794            event_data.source_etag = source_e_tag.clone();
1795            event_data.source_size = Some(source_content_length);
1796            // skipcq: RS-W1070
1797            event_data.target_etag = target_e_tag.clone();
1798            event_data.target_size = Some(target_content_length);
1799            event_data.source_checksum = Some(source_checksum.clone());
1800            event_data.target_checksum = Some(target_checksum.clone());
1801            event_data.checksum_algorithm =
1802                Some(self.config.additional_checksum_algorithm.clone().unwrap());
1803
1804            if target_checksum != source_checksum {
1805                if source_remote_storage
1806                    && is_multipart_upload_e_tag(source_e_tag)
1807                    && self.config.disable_multipart_verify
1808                {
1809                    debug!(
1810                        key = &key,
1811                        additional_checksum_algorithm = additional_checksum_algorithm,
1812                        target_checksum = target_checksum,
1813                        source_checksum = source_checksum,
1814                        "skip additional checksum verification."
1815                    );
1816                } else {
1817                    self.send_stats(SyncWarning {
1818                        key: key.to_string(),
1819                    })
1820                    .await;
1821                    self.has_warning.store(true, Ordering::SeqCst);
1822
1823                    let message = if source_remote_storage
1824                        && is_multipart_upload_e_tag(source_e_tag)
1825                        && !self.is_auto_chunksize_enabled()
1826                    {
1827                        format!("{} {}", "additional checksum", MISMATCH_WARNING_WITH_HELP)
1828                    } else {
1829                        "additional checksum mismatch. file in the target storage may be corrupted."
1830                            .to_string()
1831                    };
1832
1833                    event_data.event_type = EventType::SYNC_CHECKSUM_MISMATCH;
1834                    self.config.event_manager.trigger_event(event_data).await;
1835
1836                    warn!(
1837                        key = &key,
1838                        additional_checksum_algorithm = additional_checksum_algorithm,
1839                        target_checksum = target_checksum,
1840                        source_checksum = source_checksum,
1841                        message
1842                    );
1843                }
1844            } else {
1845                self.send_stats(ChecksumVerified {
1846                    key: key.to_string(),
1847                })
1848                .await;
1849
1850                event_data.event_type = EventType::SYNC_CHECKSUM_VERIFIED;
1851                self.config.event_manager.trigger_event(event_data).await;
1852
1853                debug!(
1854                    key = &key,
1855                    additional_checksum_algorithm = additional_checksum_algorithm,
1856                    target_checksum = target_checksum,
1857                    source_checksum = source_checksum,
1858                    "additional checksum verified.",
1859                );
1860            }
1861        }
1862    }
1863
1864    fn generate_e_tag_hash(&self, parts_count: i64) -> String {
1865        generate_e_tag_hash(&self.concatnated_md5_hash, parts_count)
1866    }
1867
1868    fn calculate_parts_count(&self, content_length: i64) -> i64 {
1869        calculate_parts_count(
1870            self.config.transfer_config.multipart_threshold as i64,
1871            self.config.transfer_config.multipart_chunksize as i64,
1872            content_length,
1873        )
1874    }
1875
1876    async fn send_stats(&self, stats: SyncStatistics) {
1877        let _ = self.stats_sender.send(stats).await;
1878    }
1879
1880    fn is_auto_chunksize_enabled(&self) -> bool {
1881        self.config.transfer_config.auto_chunksize && self.object_parts.is_some()
1882    }
1883}
1884
1885fn calculate_parts_count(
1886    multipart_threshold: i64,
1887    multipart_chunksize: i64,
1888    content_length: i64,
1889) -> i64 {
1890    if content_length < multipart_threshold {
1891        return 0;
1892    }
1893
1894    if content_length % multipart_chunksize == 0 {
1895        return content_length / multipart_chunksize;
1896    }
1897
1898    (content_length / multipart_chunksize) + 1
1899}
1900
1901pub fn get_additional_checksum_from_put_object_result(
1902    put_object_output: &PutObjectOutput,
1903    checksum_algorithm: Option<ChecksumAlgorithm>,
1904) -> Option<String> {
1905    checksum_algorithm.as_ref()?;
1906
1907    match checksum_algorithm.unwrap() {
1908        ChecksumAlgorithm::Sha256 => put_object_output
1909            .checksum_sha256()
1910            .map(|checksum| checksum.to_string()),
1911        ChecksumAlgorithm::Sha1 => put_object_output
1912            .checksum_sha1()
1913            .map(|checksum| checksum.to_string()),
1914        ChecksumAlgorithm::Crc32 => put_object_output
1915            .checksum_crc32()
1916            .map(|checksum| checksum.to_string()),
1917        ChecksumAlgorithm::Crc32C => put_object_output
1918            .checksum_crc32_c()
1919            .map(|checksum| checksum.to_string()),
1920        ChecksumAlgorithm::Crc64Nvme => put_object_output
1921            .checksum_crc64_nvme()
1922            .map(|checksum| checksum.to_string()),
1923        _ => {
1924            panic!("unknown algorithm")
1925        }
1926    }
1927}
1928
1929pub fn get_additional_checksum_from_multipart_upload_result(
1930    complete_multipart_upload_result: &CompleteMultipartUploadOutput,
1931    checksum_algorithm: Option<ChecksumAlgorithm>,
1932) -> Option<String> {
1933    checksum_algorithm.as_ref()?;
1934
1935    match checksum_algorithm.unwrap() {
1936        ChecksumAlgorithm::Sha256 => complete_multipart_upload_result
1937            .checksum_sha256()
1938            .map(|checksum| checksum.to_string()),
1939        ChecksumAlgorithm::Sha1 => complete_multipart_upload_result
1940            .checksum_sha1()
1941            .map(|checksum| checksum.to_string()),
1942        ChecksumAlgorithm::Crc32 => complete_multipart_upload_result
1943            .checksum_crc32()
1944            .map(|checksum| checksum.to_string()),
1945        ChecksumAlgorithm::Crc32C => complete_multipart_upload_result
1946            .checksum_crc32_c()
1947            .map(|checksum| checksum.to_string()),
1948        ChecksumAlgorithm::Crc64Nvme => complete_multipart_upload_result
1949            .checksum_crc64_nvme()
1950            .map(|checksum| checksum.to_string()),
1951        _ => {
1952            panic!("unknown algorithm")
1953        }
1954    }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959    use super::*;
1960    use aws_sdk_s3::primitives::DateTime;
1961    use tracing_subscriber::EnvFilter;
1962
1963    #[test]
1964    fn update_versioning_metadata_with_new() {
1965        init_dummy_tracing_subscriber();
1966
1967        let mut get_object_output = GetObjectOutput::builder()
1968            .last_modified(DateTime::from_secs(0))
1969            .version_id("version1")
1970            .build();
1971
1972        get_object_output = UploadManager::update_versioning_metadata(get_object_output);
1973        assert_eq!(
1974            get_object_output
1975                .metadata()
1976                .as_ref()
1977                .unwrap()
1978                .get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
1979                .unwrap(),
1980            "version1"
1981        );
1982        assert_eq!(
1983            get_object_output
1984                .metadata()
1985                .as_ref()
1986                .unwrap()
1987                .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
1988                .unwrap(),
1989            "1970-01-01T00:00:00+00:00"
1990        );
1991    }
1992
1993    #[test]
1994    fn update_versioning_metadata_with_update() {
1995        init_dummy_tracing_subscriber();
1996
1997        let mut get_object_output = GetObjectOutput::builder()
1998            .last_modified(DateTime::from_secs(0))
1999            .version_id("version1")
2000            .metadata("mykey", "myvalue")
2001            .build();
2002
2003        get_object_output = UploadManager::update_versioning_metadata(get_object_output);
2004        assert_eq!(
2005            get_object_output
2006                .metadata()
2007                .as_ref()
2008                .unwrap()
2009                .get("mykey")
2010                .unwrap(),
2011            "myvalue"
2012        );
2013        assert_eq!(
2014            get_object_output
2015                .metadata()
2016                .as_ref()
2017                .unwrap()
2018                .get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
2019                .unwrap(),
2020            "version1"
2021        );
2022        assert_eq!(
2023            get_object_output
2024                .metadata()
2025                .as_ref()
2026                .unwrap()
2027                .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2028                .unwrap(),
2029            "1970-01-01T00:00:00+00:00"
2030        );
2031    }
2032
2033    #[test]
2034    fn update_versioning_metadata_without_version_id() {
2035        init_dummy_tracing_subscriber();
2036
2037        let mut get_object_output = GetObjectOutput::builder()
2038            .last_modified(DateTime::from_secs(0))
2039            .metadata("mykey", "myvalue")
2040            .build();
2041
2042        get_object_output = UploadManager::update_versioning_metadata(get_object_output);
2043        assert_eq!(get_object_output.metadata().as_ref().unwrap().len(), 1);
2044        assert_eq!(
2045            get_object_output.metadata().unwrap().get("mykey").unwrap(),
2046            "myvalue"
2047        );
2048    }
2049
2050    #[test]
2051    fn modify_last_modified_metadata_with_new() {
2052        init_dummy_tracing_subscriber();
2053
2054        let mut get_object_output = GetObjectOutput::builder()
2055            .last_modified(DateTime::from_secs(0))
2056            .build();
2057        get_object_output = UploadManager::modify_last_modified_metadata(get_object_output);
2058
2059        assert_eq!(
2060            get_object_output
2061                .metadata()
2062                .unwrap()
2063                .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2064                .unwrap(),
2065            "1970-01-01T00:00:00+00:00"
2066        );
2067    }
2068
2069    #[test]
2070    fn modify_last_modified_metadata_with_update() {
2071        init_dummy_tracing_subscriber();
2072
2073        let mut get_object_output = GetObjectOutput::builder()
2074            .last_modified(DateTime::from_secs(0))
2075            .metadata("key1", "value1")
2076            .build();
2077        get_object_output = UploadManager::modify_last_modified_metadata(get_object_output);
2078
2079        assert_eq!(
2080            get_object_output
2081                .metadata()
2082                .unwrap()
2083                .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2084                .unwrap(),
2085            "1970-01-01T00:00:00+00:00"
2086        );
2087        assert_eq!(
2088            get_object_output.metadata().unwrap().get("key1").unwrap(),
2089            "value1"
2090        );
2091    }
2092
2093    #[test]
2094    fn calculate_parts_count_test() {
2095        init_dummy_tracing_subscriber();
2096
2097        assert_eq!(
2098            calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024),
2099            1
2100        );
2101
2102        assert_eq!(
2103            calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, (8 * 1024 * 1024) - 1),
2104            0
2105        );
2106
2107        assert_eq!(
2108            calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, 16 * 1024 * 1024),
2109            2
2110        );
2111
2112        assert_eq!(
2113            calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, (16 * 1024 * 1024) + 1),
2114            3
2115        );
2116    }
2117
2118    fn init_dummy_tracing_subscriber() {
2119        let _ = tracing_subscriber::fmt()
2120            .with_env_filter(
2121                EnvFilter::try_from_default_env()
2122                    .or_else(|_| EnvFilter::try_new("dummy=trace"))
2123                    .unwrap(),
2124            )
2125            .try_init();
2126    }
2127}