1use anyhow::{Context, Result, anyhow};
2use async_channel::Sender;
3use aws_sdk_s3::Client;
4use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput;
5use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
6use aws_sdk_s3::operation::get_object::GetObjectOutput;
7use aws_sdk_s3::operation::put_object::PutObjectOutput;
8use aws_sdk_s3::operation::put_object::builders::PutObjectOutputBuilder;
9use aws_sdk_s3::primitives::ByteStream;
10use aws_sdk_s3::primitives::{DateTime, DateTimeFormat};
11use aws_sdk_s3::types::{
12 ChecksumAlgorithm, ChecksumType, CompletedMultipartUpload, CompletedPart, MetadataDirective,
13 ObjectPart, RequestPayer, ServerSideEncryption, StorageClass, TaggingDirective,
14};
15use aws_smithy_types_convert::date_time::DateTimeExt;
16use base64::{Engine as _, engine::general_purpose};
17use chrono::SecondsFormat;
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Mutex};
21use tokio::io::AsyncReadExt;
22use tokio::task;
23use tokio::task::JoinHandle;
24use tracing::{debug, error, trace, warn};
25
26use crate::config::Config;
27use crate::storage;
28use crate::storage::e_tag_verify::{generate_e_tag_hash, is_multipart_upload_e_tag};
29use crate::storage::{
30 Storage, convert_copy_to_put_object_output, convert_copy_to_upload_part_output,
31 convert_to_buf_byte_stream_with_callback, get_range_from_content_range,
32 parse_range_header_string,
33};
34use crate::types::SyncStatistics::{ChecksumVerified, ETagVerified, SyncWarning};
35use crate::types::error::S3syncError;
36use crate::types::event_callback::{EventData, EventType};
37use crate::types::preprocess_callback::{UploadMetadata, is_callback_cancelled};
38use crate::types::token::PipelineCancellationToken;
39use crate::types::{
40 S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY, S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY, SyncStatistics,
41};
42
43const MISMATCH_WARNING_WITH_HELP: &str = "mismatch. object in the target storage may be corrupted. \
44 or the current multipart_threshold or multipart_chunksize may be different when uploading to the source. \
45 To suppress this warning, please add --disable-multipart-verify command line option. \
46 To resolve this issue, please add --auto-chunksize command line option(but extra API overheads).";
47
48pub struct MutipartEtags {
49 pub digest: Vec<u8>,
50 pub part_number: i32,
51}
52pub struct UploadManager {
53 client: Arc<Client>,
54 config: Config,
55 request_payer: Option<RequestPayer>,
56 cancellation_token: PipelineCancellationToken,
57 stats_sender: Sender<SyncStatistics>,
58 tagging: Option<String>,
59 object_parts: Option<Vec<ObjectPart>>,
60 concatnated_md5_hash: Vec<u8>,
61 express_onezone_storage: bool,
62 source: Storage,
63 source_key: String,
64 source_total_size: u64,
65 source_additional_checksum: Option<String>,
66 if_match: Option<String>,
67 if_none_match: Option<String>,
68 copy_source_if_match: Option<String>,
69 has_warning: Arc<AtomicBool>,
70}
71
72impl UploadManager {
73 #[allow(clippy::too_many_arguments)]
74 pub fn new(
75 client: Arc<Client>,
76 config: Config,
77 request_payer: Option<RequestPayer>,
78 cancellation_token: PipelineCancellationToken,
79 stats_sender: Sender<SyncStatistics>,
80 tagging: Option<String>,
81 object_parts: Option<Vec<ObjectPart>>,
82 express_onezone_storage: bool,
83 source: Storage,
84 source_key: String,
85 source_total_size: u64,
86 source_additional_checksum: Option<String>,
87 if_match: Option<String>,
88 if_none_match: Option<String>,
89 copy_source_if_match: Option<String>,
90 has_warning: Arc<AtomicBool>,
91 ) -> Self {
92 UploadManager {
93 client,
94 config,
95 request_payer,
96 cancellation_token,
97 stats_sender,
98 tagging,
99 object_parts,
100 concatnated_md5_hash: vec![],
101 express_onezone_storage,
102 source,
103 source_key,
104 source_total_size,
105 source_additional_checksum,
106 if_match,
107 if_none_match,
108 copy_source_if_match,
109 has_warning,
110 }
111 }
112
113 pub async fn upload(
114 &mut self,
115 bucket: &str,
116 key: &str,
117 mut get_object_output_first_chunk: GetObjectOutput,
118 ) -> Result<PutObjectOutput> {
119 get_object_output_first_chunk = self.modify_metadata(get_object_output_first_chunk);
120
121 if self.is_auto_chunksize_enabled() {
122 if is_multipart_upload_e_tag(
123 &get_object_output_first_chunk
124 .e_tag()
125 .map(|e_tag| e_tag.to_string()),
126 ) {
127 return self
128 .upload_with_auto_chunksize(bucket, key, get_object_output_first_chunk)
129 .await;
130 }
131
132 let first_chunk_size = self
133 .object_parts
134 .as_ref()
135 .unwrap()
136 .first()
137 .unwrap()
138 .size()
139 .unwrap();
140 if self.source_total_size != first_chunk_size as u64 {
141 return Err(anyhow!(format!(
142 "source_total_size does not match the first object part size: \
143 source_total_size = {}, first object part size = {}",
144 self.source_total_size, first_chunk_size
145 )));
146 }
147
148 let put_object_output = self
150 .singlepart_upload(bucket, key, get_object_output_first_chunk)
151 .await?;
152 trace!(key = key, "{put_object_output:?}");
153 return Ok(put_object_output);
154 }
155
156 let put_object_output = if self
157 .config
158 .transfer_config
159 .is_multipart_upload_required(self.source_total_size)
160 {
161 self.multipart_upload(bucket, key, get_object_output_first_chunk)
162 .await?
163 } else {
164 self.singlepart_upload(bucket, key, get_object_output_first_chunk)
165 .await?
166 };
167
168 trace!(key = key, "{put_object_output:?}");
169 Ok(put_object_output)
170 }
171
172 fn modify_metadata(&self, mut get_object_output: GetObjectOutput) -> GetObjectOutput {
173 if self.config.no_sync_system_metadata {
174 get_object_output = Self::clear_system_meta_data(get_object_output);
175 }
176
177 if self.config.no_sync_user_defined_metadata {
178 get_object_output.metadata = None
179 }
180
181 if self.config.metadata.is_some() {
182 get_object_output.metadata = Some(self.config.metadata.as_ref().unwrap().clone());
183 }
184
185 if self.config.put_last_modified_metadata {
186 get_object_output = Self::modify_last_modified_metadata(get_object_output);
187 }
188
189 if self.config.enable_versioning {
190 get_object_output = Self::update_versioning_metadata(get_object_output);
191 }
192
193 get_object_output
194 }
195
196 fn modify_last_modified_metadata(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
197 let mut metadata = get_object_output.metadata().cloned().unwrap_or_default();
198 let last_modified = DateTime::from_millis(
199 get_object_output
200 .last_modified()
201 .unwrap()
202 .to_millis()
203 .unwrap(),
204 )
205 .to_chrono_utc()
206 .unwrap()
207 .to_rfc3339_opts(SecondsFormat::Secs, false);
208
209 metadata.insert(
210 S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY.to_string(),
211 last_modified,
212 );
213 get_object_output.metadata = Some(metadata);
214
215 get_object_output
216 }
217
218 fn clear_system_meta_data(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
219 get_object_output.content_disposition = None;
220 get_object_output.content_encoding = None;
221 get_object_output.content_language = None;
222 get_object_output.content_type = None;
223 get_object_output.cache_control = None;
224 get_object_output.expires_string = None;
225 get_object_output.website_redirect_location = None;
226
227 get_object_output
228 }
229
230 fn update_versioning_metadata(mut get_object_output: GetObjectOutput) -> GetObjectOutput {
231 if get_object_output.version_id().is_none() {
232 return get_object_output;
233 }
234
235 let source_version_id = get_object_output.version_id().unwrap();
236
237 let mut metadata = get_object_output.metadata().cloned().unwrap_or_default();
238
239 let last_modified = DateTime::from_millis(
240 get_object_output
241 .last_modified()
242 .unwrap()
243 .to_millis()
244 .unwrap(),
245 )
246 .to_chrono_utc()
247 .unwrap()
248 .to_rfc3339_opts(SecondsFormat::Secs, false);
249
250 metadata.insert(
251 S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY.to_string(),
252 source_version_id.to_string(),
253 );
254 metadata.insert(
255 S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY.to_string(),
256 last_modified,
257 );
258
259 get_object_output.metadata = Some(metadata);
260
261 get_object_output
262 }
263
264 pub async fn upload_with_auto_chunksize(
265 &mut self,
266 bucket: &str,
267 key: &str,
268 get_object_output_first_chunk: GetObjectOutput,
269 ) -> Result<PutObjectOutput> {
270 if self.object_parts.as_ref().unwrap().is_empty() {
271 panic!("Illegal object_parts state");
272 }
273
274 let put_object_output = self
275 .multipart_upload(bucket, key, get_object_output_first_chunk)
276 .await?;
277
278 trace!(key = key, "{put_object_output:?}");
279
280 Ok(put_object_output)
281 }
282
283 async fn multipart_upload(
284 &mut self,
285 bucket: &str,
286 key: &str,
287 get_object_output_first_chunk: GetObjectOutput,
288 ) -> Result<PutObjectOutput> {
289 let storage_class = if self.config.storage_class.is_none() {
290 get_object_output_first_chunk.storage_class().cloned()
291 } else {
292 self.config.storage_class.clone()
293 };
294
295 let checksum_type = if self.config.full_object_checksum {
296 Some(ChecksumType::FullObject)
297 } else {
298 None
299 };
300
301 let mut upload_metadata = UploadMetadata {
302 acl: self.config.canned_acl.clone(),
303 cache_control: if self.config.cache_control.is_none() {
304 get_object_output_first_chunk
305 .cache_control()
306 .map(|value| value.to_string())
307 } else {
308 self.config.cache_control.clone()
309 },
310 content_disposition: if self.config.content_disposition.is_none() {
311 get_object_output_first_chunk
312 .content_disposition()
313 .map(|value| value.to_string())
314 } else {
315 self.config.content_disposition.clone()
316 },
317 content_encoding: if self.config.content_encoding.is_none() {
318 get_object_output_first_chunk
319 .content_encoding()
320 .map(|value| value.to_string())
321 } else {
322 self.config.content_encoding.clone()
323 },
324 content_language: if self.config.content_language.is_none() {
325 get_object_output_first_chunk
326 .content_language()
327 .map(|value| value.to_string())
328 } else {
329 self.config.content_language.clone()
330 },
331 content_type: if self.config.content_type.is_none() {
332 get_object_output_first_chunk
333 .content_type()
334 .map(|value| value.to_string())
335 } else {
336 self.config.content_type.clone()
337 },
338 expires: if self.config.expires.is_none() {
339 get_object_output_first_chunk
340 .expires_string()
341 .map(|expires_string| {
342 DateTime::from_str(expires_string, DateTimeFormat::HttpDate).unwrap()
343 })
344 } else {
345 Some(DateTime::from_str(
346 &self.config.expires.unwrap().to_rfc3339(),
347 DateTimeFormat::DateTimeWithOffset,
348 )?)
349 },
350 metadata: if self.config.metadata.is_none() {
351 get_object_output_first_chunk.metadata().cloned()
352 } else {
353 self.config.metadata.clone()
354 },
355 request_payer: self.request_payer.clone(),
356 storage_class: storage_class.clone(),
357 website_redirect_location: if self.config.website_redirect.is_none() {
358 get_object_output_first_chunk
359 .website_redirect_location()
360 .map(|value| value.to_string())
361 } else {
362 self.config.website_redirect.clone()
363 },
364 tagging: self.tagging.clone(),
365 };
366
367 let callback_result = self
368 .config
369 .preprocess_manager
370 .execute_preprocessing(key, &get_object_output_first_chunk, &mut upload_metadata)
371 .await;
372 if let Err(e) = callback_result {
373 if is_callback_cancelled(&e) {
374 debug!(
375 key = key,
376 "PreprocessCallback execute_preprocessing() cancelled. Skipping upload."
377 );
378 return Ok(PutObjectOutputBuilder::default().build());
379 } else {
380 return Err(e.context("PreprocessCallback before_upload() failed."));
381 }
382 }
383
384 let create_multipart_upload_output = self
385 .client
386 .create_multipart_upload()
387 .set_request_payer(upload_metadata.request_payer)
388 .set_storage_class(upload_metadata.storage_class)
389 .bucket(bucket)
390 .key(key)
391 .set_metadata(upload_metadata.metadata)
392 .set_tagging(upload_metadata.tagging)
393 .set_website_redirect_location(upload_metadata.website_redirect_location)
394 .set_content_type(upload_metadata.content_type)
395 .set_content_encoding(upload_metadata.content_encoding)
396 .set_cache_control(upload_metadata.cache_control)
397 .set_content_disposition(upload_metadata.content_disposition)
398 .set_content_language(upload_metadata.content_language)
399 .set_expires(upload_metadata.expires)
400 .set_server_side_encryption(self.config.sse.clone())
401 .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
402 .set_sse_customer_algorithm(self.config.target_sse_c.clone())
403 .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
404 .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
405 .set_acl(upload_metadata.acl)
406 .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
407 .set_checksum_type(checksum_type)
408 .send()
409 .await
410 .context("aws_sdk_s3::client::Client create_multipart_upload() failed.")?;
411 let upload_id = create_multipart_upload_output.upload_id().unwrap();
412
413 let upload_result = self
414 .upload_parts_and_complete(bucket, key, upload_id, get_object_output_first_chunk)
415 .await
416 .context("upload_parts() failed.");
417 if upload_result.is_err() {
418 self.abort_multipart_upload(bucket, key, upload_id).await?;
419 return Err(upload_result.err().unwrap());
420 }
421
422 upload_result
423 }
424
425 #[rustfmt::skip] async fn abort_multipart_upload(&self, bucket: &str, key: &str, upload_id: &str) -> Result<AbortMultipartUploadOutput> {
427 self.client.abort_multipart_upload().set_request_payer(self.request_payer.clone()).bucket(bucket).key(key).upload_id(upload_id).send().await.context("aws_sdk_s3::client::Client abort_multipart_upload() failed.")
428 }
429
430 async fn upload_parts_and_complete(
431 &mut self,
432 bucket: &str,
433 key: &str,
434 upload_id: &str,
435 get_object_output_first_chunk: GetObjectOutput,
436 ) -> Result<PutObjectOutput> {
437 let source_sse = get_object_output_first_chunk
438 .server_side_encryption()
439 .cloned();
440 let source_remote_storage = get_object_output_first_chunk.e_tag().is_some();
441 let source_content_length = self.source_total_size;
442 let source_e_tag = get_object_output_first_chunk
443 .e_tag()
444 .map(|e_tag| e_tag.to_string());
445 let source_local_storage = source_e_tag.is_none();
446 let source_checksum = self.source_additional_checksum.clone();
447 let source_storage_class = get_object_output_first_chunk.storage_class().cloned();
448 let source_version_id = get_object_output_first_chunk
449 .version_id()
450 .map(|v| v.to_string());
451 let source_last_modified = get_object_output_first_chunk.last_modified().copied();
452
453 let upload_parts = if self.is_auto_chunksize_enabled() {
454 self.upload_parts_with_auto_chunksize(
455 bucket,
456 key,
457 upload_id,
458 get_object_output_first_chunk,
459 )
460 .await
461 .context("upload_parts_with_auto_chunksize() failed.")?
462 } else {
463 self.upload_parts(bucket, key, upload_id, get_object_output_first_chunk)
464 .await
465 .context("upload_parts() failed.")?
466 };
467
468 let checksum_type = if self.config.full_object_checksum {
469 Some(ChecksumType::FullObject)
470 } else {
471 None
472 };
473
474 let completed_multipart_upload = CompletedMultipartUpload::builder()
475 .set_parts(Some(upload_parts))
476 .build();
477
478 let complete_multipart_upload_output = self
479 .client
480 .complete_multipart_upload()
481 .set_request_payer(self.request_payer.clone())
482 .bucket(bucket)
483 .key(key)
484 .upload_id(upload_id)
485 .multipart_upload(completed_multipart_upload)
486 .set_sse_customer_algorithm(self.config.target_sse_c.clone())
487 .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
488 .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
489 .set_checksum_type(checksum_type)
490 .set_if_match(self.if_match.clone())
491 .set_if_none_match(self.if_none_match.clone())
492 .send()
493 .await
494 .context("aws_sdk_s3::client::Client complete_multipart_upload() failed.")?;
495
496 trace!(
497 key = key,
498 upload_id = upload_id,
499 if_match = self.if_match.clone(),
500 "{complete_multipart_upload_output:?}"
501 );
502
503 let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
504 event_data.key = Some(key.to_string());
505 event_data.source_version_id = source_version_id.clone();
507 event_data.target_version_id = complete_multipart_upload_output
508 .version_id
509 .clone()
510 .map(|v| v.to_string());
511 event_data.source_etag = source_e_tag.clone();
513 event_data.source_last_modified = source_last_modified;
514 event_data.source_size = Some(self.source_total_size);
515 event_data.target_size = Some(self.source_total_size); self.config.event_manager.trigger_event(event_data).await;
517
518 let source_e_tag = if source_local_storage {
519 Some(self.generate_e_tag_hash(self.calculate_parts_count(source_content_length as i64)))
520 } else {
521 source_e_tag
522 };
523
524 let mut target_e_tag = None;
525 if !self.config.disable_etag_verify
526 && !self.express_onezone_storage
527 && !self.config.disable_content_md5_header
528 && source_storage_class != Some(StorageClass::ExpressOnezone)
529 {
530 let target_sse = complete_multipart_upload_output
531 .server_side_encryption()
532 .cloned();
533 target_e_tag = complete_multipart_upload_output
534 .e_tag()
535 .map(|e| e.to_string());
536
537 self.verify_e_tag(
538 key,
539 &source_sse,
540 source_remote_storage,
541 &source_e_tag,
542 &target_sse,
543 &target_e_tag,
544 source_version_id.clone(),
545 complete_multipart_upload_output
546 .version_id
547 .clone()
548 .map(|v| v.to_string()),
549 source_last_modified,
550 self.source_total_size,
551 self.source_total_size, )
553 .await;
554 }
555
556 if !self.config.disable_additional_checksum_verify {
557 let target_checksum = get_additional_checksum_from_multipart_upload_result(
558 &complete_multipart_upload_output,
559 self.config.additional_checksum_algorithm.clone(),
560 );
561
562 self.validate_checksum(
563 key,
564 source_checksum,
565 target_checksum,
566 &source_e_tag,
567 &target_e_tag,
568 source_remote_storage,
569 source_version_id,
570 complete_multipart_upload_output
571 .version_id
572 .clone()
573 .map(|v| v.to_string()),
574 source_last_modified,
575 self.source_total_size,
576 self.source_total_size, )
578 .await;
579 }
580
581 Ok(PutObjectOutput::builder()
582 .e_tag(complete_multipart_upload_output.e_tag().unwrap())
583 .build())
584 }
585
586 #[allow(clippy::too_many_arguments)]
587 async fn verify_e_tag(
588 &mut self,
589 key: &str,
590 source_sse: &Option<ServerSideEncryption>,
591 source_remote_storage: bool,
592 source_e_tag: &Option<String>,
593 target_sse: &Option<ServerSideEncryption>,
594 target_e_tag: &Option<String>,
595 source_version_id: Option<String>,
596 target_version_id: Option<String>,
597 source_last_modified: Option<DateTime>,
598 source_content_length: u64,
599 target_content_length: u64,
600 ) {
601 let verify_result = storage::e_tag_verify::verify_e_tag(
602 !self.config.disable_multipart_verify,
603 &self.config.source_sse_c,
604 &self.config.target_sse_c,
605 source_sse,
606 source_e_tag,
607 target_sse,
608 target_e_tag,
609 );
610
611 let mut event_data = EventData::new(EventType::UNDEFINED);
612 event_data.key = Some(key.to_string());
613 event_data.source_version_id = source_version_id;
614 event_data.target_version_id = target_version_id;
615 event_data.source_etag = source_e_tag.clone();
617 event_data.target_etag = target_e_tag.clone();
619 event_data.source_last_modified = source_last_modified;
620 event_data.source_size = Some(source_content_length);
621 event_data.target_size = Some(target_content_length);
622
623 if let Some(e_tag_match) = verify_result {
624 if !e_tag_match {
625 if source_remote_storage
626 && is_multipart_upload_e_tag(source_e_tag)
627 && self.config.disable_multipart_verify
628 {
629 debug!(
630 key = &key,
631 source_e_tag = source_e_tag,
632 target_e_tag = target_e_tag,
633 "skip e_tag verification"
634 );
635 } else {
636 let message = if source_remote_storage
637 && is_multipart_upload_e_tag(source_e_tag)
638 && !self.is_auto_chunksize_enabled()
639 {
640 format!("{} {}", "e_tag", MISMATCH_WARNING_WITH_HELP)
641 } else {
642 "e_tag mismatch. file in the target storage may be corrupted.".to_string()
643 };
644
645 self.send_stats(SyncWarning {
646 key: key.to_string(),
647 })
648 .await;
649 self.has_warning.store(true, Ordering::SeqCst);
650
651 event_data.event_type = EventType::SYNC_ETAG_MISMATCH;
652 self.config.event_manager.trigger_event(event_data).await;
653
654 let source_e_tag = source_e_tag.clone().unwrap();
655 let target_e_tag = target_e_tag.clone().unwrap();
656
657 warn!(
658 key = &key,
659 source_e_tag = source_e_tag,
660 target_e_tag = target_e_tag,
661 message
662 );
663 }
664 } else {
665 self.send_stats(ETagVerified {
666 key: key.to_string(),
667 })
668 .await;
669
670 event_data.event_type = EventType::SYNC_ETAG_VERIFIED;
671 self.config.event_manager.trigger_event(event_data).await;
672
673 debug!(
674 key = &key,
675 source_e_tag = source_e_tag,
676 target_e_tag = target_e_tag,
677 "e_tag verified."
678 );
679 }
680 }
681 }
682
683 async fn upload_parts(
685 &mut self,
686 bucket: &str,
687 key: &str,
688 upload_id: &str,
689 get_object_output_first_chunk: GetObjectOutput,
690 ) -> Result<Vec<CompletedPart>> {
691 let shared_source_version_id = get_object_output_first_chunk
692 .version_id()
693 .map(|v| v.to_string());
694 let shared_multipart_etags = Arc::new(Mutex::new(Vec::new()));
695 let shared_upload_parts = Arc::new(Mutex::new(Vec::new()));
696 let shared_total_upload_size = Arc::new(Mutex::new(Vec::new()));
697
698 let first_chunk_size = get_object_output_first_chunk.content_length().unwrap();
699 let config_chunksize = self.config.transfer_config.multipart_chunksize as usize;
700 let source_total_size = self.source_total_size as usize;
701 let source_version_id = get_object_output_first_chunk
702 .version_id()
703 .map(|v| v.to_string());
704
705 let mut body = get_object_output_first_chunk.body.into_async_read();
706
707 let mut upload_parts_join_handles = FuturesUnordered::new();
708 let mut part_number = 1;
709 for offset in (0..self.source_total_size as usize).step_by(config_chunksize) {
710 if self.cancellation_token.is_cancelled() {
711 return Err(anyhow!(S3syncError::Cancelled));
712 }
713
714 let source = dyn_clone::clone_box(&*(self.source));
715 let source_key = self.source_key.clone();
716 let copy_source = if self.config.server_side_copy {
717 self.source
718 .generate_copy_source_key(source_key.as_ref(), source_version_id.clone())
719 } else {
720 "".to_string()
721 };
722 let copy_source_if_match = self.copy_source_if_match.clone();
723 let source_version_id = shared_source_version_id.clone();
724 let source_sse_c = self.config.source_sse_c.clone();
725 let source_sse_c_key = self.config.source_sse_c_key.clone();
726 let source_sse_c_key_string = self.config.source_sse_c_key.clone().key.clone();
727 let source_sse_c_key_md5 = self.config.source_sse_c_key_md5.clone();
728
729 let target = dyn_clone::clone_box(&*(self.client));
730 let target_bucket = bucket.to_string();
731 let target_key = key.to_string();
732 let target_upload_id = upload_id.to_string();
733 let target_sse_c = self.config.target_sse_c.clone();
734 let target_sse_c_key = self.config.target_sse_c_key.clone().key.clone();
735 let target_sse_c_key_md5 = self.config.target_sse_c_key_md5.clone();
736
737 let chunksize = if offset + config_chunksize > source_total_size {
738 source_total_size - offset
739 } else {
740 config_chunksize
741 };
742
743 let upload_parts = Arc::clone(&shared_upload_parts);
744 let multipart_etags = Arc::clone(&shared_multipart_etags);
745 let total_upload_size = Arc::clone(&shared_total_upload_size);
746
747 let additional_checksum_mode = self.config.additional_checksum_mode.clone();
748 let additional_checksum_algorithm = self.config.additional_checksum_algorithm.clone();
749 let disable_payload_signing = self.config.disable_payload_signing;
750 let multipart_chunksize = self.config.transfer_config.multipart_chunksize;
751 let express_onezone_storage = self.express_onezone_storage;
752 let disable_content_md5_header = self.config.disable_content_md5_header;
753 let request_payer = self.request_payer.clone();
754 let server_side_copy = self.config.server_side_copy;
755
756 let stats_sender = self.stats_sender.clone();
757 let event_manager = self.config.event_manager.clone();
758
759 let mut buffer = if !server_side_copy {
760 let mut buffer = Vec::<u8>::with_capacity(multipart_chunksize as usize);
761 buffer.resize_with(chunksize, Default::default);
762 buffer
763 } else {
764 Vec::new() };
766
767 if part_number == 1 && !server_side_copy {
769 let result = body.read_exact(buffer.as_mut_slice()).await;
770 if let Err(e) = result {
771 warn!(
772 key = &source_key,
773 part_number = part_number,
774 "Failed to read data from the body: {e:?}"
775 );
776 return Err(anyhow!(S3syncError::DownloadForceRetryableError));
777 }
778 }
779
780 let permit = self
781 .config
782 .clone()
783 .target_client_config
784 .unwrap()
785 .parallel_upload_semaphore
786 .acquire_owned()
787 .await?;
788 let task: JoinHandle<Result<()>> = task::spawn(async move {
789 let _permit = permit; let range = format!("bytes={}-{}", offset, offset + chunksize - 1);
791
792 debug!(
793 key = &target_key,
794 part_number = part_number,
795 "upload_part() start. range = {range:?}",
796 );
797
798 let upload_size;
799 if part_number > 1 {
801 if !server_side_copy {
802 let get_object_output = source
803 .get_object(
804 &source_key,
805 source_version_id.clone(),
806 additional_checksum_mode,
807 Some(range.clone()),
808 source_sse_c.clone(),
809 source_sse_c_key,
810 source_sse_c_key_md5.clone(),
811 )
812 .await
813 .context("source.get_object() failed.")?;
814 upload_size = get_object_output.content_length().unwrap();
815
816 if get_object_output.content_range().is_none() {
817 error!("get_object() returned no content range. This is unexpected.");
818 return Err(anyhow!(
819 "get_object() returned no content range. This is unexpected. key={}.",
820 &target_key
821 ));
822 }
823 let (request_start, request_end) = parse_range_header_string(&range)
824 .context("failed to parse request range header")?;
825 let (response_start, response_end) =
826 get_range_from_content_range(&get_object_output)
827 .context("get_object() returned no content range")?;
828 if (request_start != response_start) || (request_end != response_end) {
829 return Err(anyhow!(
830 "get_object() returned unexpected content range. \
831 expected: {}-{}, actual: {}-{}",
832 request_start,
833 request_end,
834 response_start,
835 response_end,
836 ));
837 }
838
839 let mut body = convert_to_buf_byte_stream_with_callback(
840 get_object_output.body.into_async_read(),
841 source.get_stats_sender().clone(),
842 source.get_rate_limit_bandwidth(),
843 None,
844 None,
845 )
846 .into_async_read();
847
848 let result = body.read_exact(buffer.as_mut_slice()).await;
849 if let Err(e) = result {
850 warn!(
851 key = &source_key,
852 part_number = part_number,
853 "Failed to read data from the body: {e:?}"
854 );
855 return Err(anyhow!(S3syncError::DownloadForceRetryableError));
856 }
857 } else {
858 upload_size = chunksize as i64;
859 }
860 } else {
861 upload_size = first_chunk_size;
862 }
863
864 let md5_digest;
865 let md5_digest_base64 =
866 if !express_onezone_storage && !disable_content_md5_header && !server_side_copy
867 {
868 let md5_digest_raw = md5::compute(&buffer);
869 md5_digest = Some(md5_digest_raw);
870 Some(general_purpose::STANDARD.encode(md5_digest_raw.as_slice()))
871 } else {
872 md5_digest = None;
873 None
874 };
875
876 let upload_part_output;
877 if !server_side_copy {
878 let builder = target
879 .upload_part()
880 .set_request_payer(request_payer)
881 .bucket(&target_bucket)
882 .key(&target_key)
883 .upload_id(target_upload_id.clone())
884 .part_number(part_number)
885 .set_content_md5(md5_digest_base64)
886 .content_length(chunksize as i64)
887 .set_checksum_algorithm(additional_checksum_algorithm)
888 .set_sse_customer_algorithm(target_sse_c)
889 .set_sse_customer_key(target_sse_c_key)
890 .set_sse_customer_key_md5(target_sse_c_key_md5)
891 .body(ByteStream::from(buffer));
892
893 upload_part_output = if disable_payload_signing {
894 builder
895 .customize()
896 .disable_payload_signing()
897 .send()
898 .await
899 .context("aws_sdk_s3::client::Client upload_part() failed.")?
900 } else {
901 builder
902 .send()
903 .await
904 .context("aws_sdk_s3::client::Client upload_part() failed.")?
905 };
906
907 debug!(
908 key = &target_key,
909 part_number = part_number,
910 "upload_part() complete",
911 );
912
913 trace!(key = &target_key, "{upload_part_output:?}");
914
915 #[allow(clippy::unnecessary_unwrap)]
916 if md5_digest.is_some() {
917 let mut locked_multipart_etags = multipart_etags.lock().unwrap();
918 locked_multipart_etags.push(MutipartEtags {
919 digest: md5_digest.as_ref().unwrap().as_slice().to_vec(),
920 part_number,
921 });
922 }
923 } else {
924 let upload_part_copy_output = target
925 .upload_part_copy()
926 .copy_source(copy_source)
927 .set_request_payer(request_payer)
928 .set_copy_source_range(Some(range))
929 .bucket(&target_bucket)
930 .key(&target_key)
931 .upload_id(target_upload_id.clone())
932 .part_number(part_number)
933 .set_copy_source_sse_customer_algorithm(source_sse_c)
934 .set_copy_source_sse_customer_key(source_sse_c_key_string)
935 .set_copy_source_sse_customer_key_md5(source_sse_c_key_md5)
936 .set_sse_customer_algorithm(target_sse_c)
937 .set_sse_customer_key(target_sse_c_key)
938 .set_sse_customer_key_md5(target_sse_c_key_md5)
939 .set_copy_source_if_match(copy_source_if_match.clone())
940 .send()
941 .await?;
942
943 debug!(
944 key = &target_key,
945 part_number = part_number,
946 copy_source_if_match = copy_source_if_match,
947 "upload_part_copy() complete",
948 );
949
950 trace!(key = &target_key, "{upload_part_copy_output:?}");
951
952 let _ =
953 stats_sender.send_blocking(SyncStatistics::SyncBytes(upload_size as u64));
954
955 upload_part_output =
956 convert_copy_to_upload_part_output(upload_part_copy_output);
957 }
958
959 let mut event_data = EventData::new(EventType::SYNC_WRITE);
960 event_data.key = Some(target_key.clone());
961 event_data.source_version_id = source_version_id.clone();
963 event_data.source_size = Some(source_total_size as u64);
964 event_data.upload_id = Some(target_upload_id.clone());
965 event_data.part_number = Some(part_number as u64);
966 event_data.byte_written = Some(upload_size as u64);
967 event_manager.trigger_event(event_data).await;
968
969 let mut upload_size_vec = total_upload_size.lock().unwrap();
970 upload_size_vec.push(upload_size);
971
972 let mut locked_upload_parts = upload_parts.lock().unwrap();
973 locked_upload_parts.push(
974 CompletedPart::builder()
975 .e_tag(upload_part_output.e_tag().unwrap())
976 .set_checksum_sha256(
977 upload_part_output
978 .checksum_sha256()
979 .map(|digest| digest.to_string()),
980 )
981 .set_checksum_sha1(
982 upload_part_output
983 .checksum_sha1()
984 .map(|digest| digest.to_string()),
985 )
986 .set_checksum_crc32(
987 upload_part_output
988 .checksum_crc32()
989 .map(|digest| digest.to_string()),
990 )
991 .set_checksum_crc32_c(
992 upload_part_output
993 .checksum_crc32_c()
994 .map(|digest| digest.to_string()),
995 )
996 .set_checksum_crc64_nvme(
997 upload_part_output
998 .checksum_crc64_nvme()
999 .map(|digest| digest.to_string()),
1000 )
1001 .part_number(part_number)
1002 .build(),
1003 );
1004
1005 trace!(
1006 key = &target_key,
1007 upload_id = &target_upload_id,
1008 "{locked_upload_parts:?}"
1009 );
1010
1011 Ok(())
1012 });
1013
1014 upload_parts_join_handles.push(task);
1015
1016 part_number += 1;
1017 }
1018
1019 while let Some(result) = upload_parts_join_handles.next().await {
1020 result??;
1021 if self.cancellation_token.is_cancelled() {
1022 return Err(anyhow!(S3syncError::Cancelled));
1023 }
1024 }
1025
1026 let total_upload_size: i64 = shared_total_upload_size.lock().unwrap().iter().sum();
1027 if total_upload_size == self.source_total_size as i64 {
1028 debug!(
1029 key,
1030 total_upload_size, "multipart upload completed successfully."
1031 );
1032 } else {
1033 return Err(anyhow!(format!(
1034 "multipart upload size mismatch: key={key}, expected = {0}, actual {total_upload_size}",
1035 self.source_total_size
1036 )));
1037 }
1038
1039 let mut locked_multipart_etags = shared_multipart_etags.lock().unwrap();
1041 locked_multipart_etags.sort_by_key(|e| e.part_number);
1042 for etag in locked_multipart_etags.iter() {
1043 self.concatnated_md5_hash.append(&mut etag.digest.clone());
1044 }
1045
1046 let mut parts = shared_upload_parts.lock().unwrap().clone();
1048 parts.sort_by_key(|part| part.part_number.unwrap());
1049 Ok(parts)
1050 }
1051
1052 async fn upload_parts_with_auto_chunksize(
1054 &mut self,
1055 bucket: &str,
1056 key: &str,
1057 upload_id: &str,
1058 get_object_output_first_chunk: GetObjectOutput,
1059 ) -> Result<Vec<CompletedPart>> {
1060 let shared_source_version_id = get_object_output_first_chunk
1061 .version_id()
1062 .map(|v| v.to_string());
1063 let shared_multipart_etags = Arc::new(Mutex::new(Vec::new()));
1064 let shared_upload_parts = Arc::new(Mutex::new(Vec::new()));
1065 let shared_total_upload_size = Arc::new(Mutex::new(Vec::new()));
1066
1067 let source_version_id = get_object_output_first_chunk
1068 .version_id()
1069 .map(|v| v.to_string());
1070
1071 let first_chunk_size = get_object_output_first_chunk.content_length().unwrap();
1072 let mut body = get_object_output_first_chunk.body.into_async_read();
1073
1074 let mut upload_parts_join_handles = FuturesUnordered::new();
1075 let mut part_number = 1;
1076 let mut offset = 0;
1077
1078 while part_number <= self.object_parts.as_ref().unwrap().len() as i32 {
1079 if self.cancellation_token.is_cancelled() {
1080 return Err(anyhow!(S3syncError::Cancelled));
1081 }
1082
1083 let source = dyn_clone::clone_box(&*(self.source));
1084 let source_key = self.source_key.clone();
1085 let source_total_size = self.source_total_size as usize;
1086 let copy_source = if self.config.server_side_copy {
1087 self.source
1088 .generate_copy_source_key(source_key.as_ref(), source_version_id.clone())
1089 } else {
1090 "".to_string()
1091 };
1092 let copy_source_if_match = self.copy_source_if_match.clone();
1093 let source_version_id = shared_source_version_id.clone();
1094 let source_sse_c = self.config.source_sse_c.clone();
1095 let source_sse_c_key = self.config.source_sse_c_key.clone();
1096 let source_sse_c_key_string = self.config.source_sse_c_key.clone().key.clone();
1097 let source_sse_c_key_md5 = self.config.source_sse_c_key_md5.clone();
1098
1099 let target = dyn_clone::clone_box(&*(self.client));
1100 let target_bucket = bucket.to_string();
1101 let target_key = key.to_string();
1102 let target_upload_id = upload_id.to_string();
1103 let target_sse_c = self.config.target_sse_c.clone();
1104 let target_sse_c_key = self.config.target_sse_c_key.clone().key.clone();
1105 let target_sse_c_key_md5 = self.config.target_sse_c_key_md5.clone();
1106
1107 let object_part_chunksize = self
1108 .object_parts
1109 .as_ref()
1110 .unwrap()
1111 .get(part_number as usize - 1)
1112 .unwrap()
1113 .size()
1114 .unwrap();
1115
1116 let upload_parts = Arc::clone(&shared_upload_parts);
1117 let multipart_etags = Arc::clone(&shared_multipart_etags);
1118 let total_upload_size = Arc::clone(&shared_total_upload_size);
1119
1120 let additional_checksum_mode = self.config.additional_checksum_mode.clone();
1121 let additional_checksum_algorithm = self.config.additional_checksum_algorithm.clone();
1122 let disable_payload_signing = self.config.disable_payload_signing;
1123 let express_onezone_storage = self.express_onezone_storage;
1124 let disable_content_md5_header = self.config.disable_content_md5_header;
1125 let request_payer = self.request_payer.clone();
1126 let server_side_copy = self.config.server_side_copy;
1127
1128 let stats_sender = self.stats_sender.clone();
1129 let event_manager = self.config.event_manager.clone();
1130
1131 let mut buffer = if !server_side_copy {
1132 let mut buffer = Vec::<u8>::with_capacity(object_part_chunksize as usize);
1133 buffer.resize_with(object_part_chunksize as usize, Default::default);
1134 buffer
1135 } else {
1136 Vec::new() };
1138
1139 if part_number == 1 && !server_side_copy {
1141 let result = body.read_exact(buffer.as_mut_slice()).await;
1142 if let Err(e) = result {
1143 warn!(
1144 key = &source_key,
1145 part_number = part_number,
1146 "Failed to read data from the body: {e:?}"
1147 );
1148 return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1149 }
1150 }
1151
1152 let permit = self
1153 .config
1154 .clone()
1155 .target_client_config
1156 .unwrap()
1157 .parallel_upload_semaphore
1158 .acquire_owned()
1159 .await?;
1160 let task: JoinHandle<Result<()>> = task::spawn(async move {
1161 let _permit = permit; let range = format!("bytes={}-{}", offset, offset + object_part_chunksize - 1);
1163
1164 debug!(
1165 key = &target_key,
1166 part_number = part_number,
1167 "upload_part() start. range = {range:?}",
1168 );
1169
1170 let upload_size;
1171 if part_number > 1 {
1173 if !server_side_copy {
1174 let get_object_output = source
1175 .get_object(
1176 &source_key,
1177 source_version_id.clone(),
1178 additional_checksum_mode,
1179 Some(range.clone()),
1180 source_sse_c.clone(),
1181 source_sse_c_key.clone(),
1182 source_sse_c_key_md5.clone(),
1183 )
1184 .await
1185 .context("source.get_object() failed.")?;
1186 upload_size = get_object_output.content_length().unwrap();
1187
1188 if get_object_output.content_range().is_none() {
1189 error!(
1190 "get_object() - auto-chunksize returned no content range. This is unexpected."
1191 );
1192 return Err(anyhow!(
1193 "get_object() returned no content range. This is unexpected. key={}.",
1194 &target_key
1195 ));
1196 }
1197 let (request_start, request_end) = parse_range_header_string(&range)
1198 .context("failed to parse request range header")?;
1199 let (response_start, response_end) =
1200 get_range_from_content_range(&get_object_output)
1201 .context("get_object() returned no content range")?;
1202 if (request_start != response_start) || (request_end != response_end) {
1203 return Err(anyhow!(
1204 "get_object() - auto-chunksize returned unexpected content range. \
1205 expected: {}-{}, actual: {}-{}",
1206 request_start,
1207 request_end,
1208 response_start,
1209 response_end,
1210 ));
1211 }
1212
1213 let mut body = convert_to_buf_byte_stream_with_callback(
1214 get_object_output.body.into_async_read(),
1215 source.get_stats_sender().clone(),
1216 source.get_rate_limit_bandwidth(),
1217 None,
1218 None,
1219 )
1220 .into_async_read();
1221
1222 let result = body.read_exact(buffer.as_mut_slice()).await;
1223 if let Err(e) = result {
1224 warn!(
1225 key = &source_key,
1226 part_number = part_number,
1227 "Failed to read data from the body: {e:?}"
1228 );
1229 return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1230 }
1231 } else {
1232 upload_size = object_part_chunksize;
1233 }
1234 } else {
1235 upload_size = first_chunk_size;
1236 }
1237
1238 let md5_digest;
1239 let md5_digest_base64 =
1240 if !express_onezone_storage && !disable_content_md5_header && !server_side_copy
1241 {
1242 let md5_digest_raw = md5::compute(&buffer);
1243 md5_digest = Some(md5_digest_raw);
1244 Some(general_purpose::STANDARD.encode(md5_digest_raw.as_slice()))
1245 } else {
1246 md5_digest = None;
1247 None
1248 };
1249
1250 let upload_part_output;
1251 if !server_side_copy {
1252 let builder = target
1253 .upload_part()
1254 .set_request_payer(request_payer)
1255 .bucket(&target_bucket)
1256 .key(&target_key)
1257 .upload_id(target_upload_id.clone())
1258 .part_number(part_number)
1259 .set_content_md5(md5_digest_base64)
1260 .content_length(object_part_chunksize)
1261 .set_checksum_algorithm(additional_checksum_algorithm)
1262 .set_sse_customer_algorithm(target_sse_c)
1263 .set_sse_customer_key(target_sse_c_key)
1264 .set_sse_customer_key_md5(target_sse_c_key_md5)
1265 .body(ByteStream::from(buffer));
1266
1267 upload_part_output = if disable_payload_signing {
1268 builder
1269 .customize()
1270 .disable_payload_signing()
1271 .send()
1272 .await
1273 .context("aws_sdk_s3::client::Client upload_part() failed.")?
1274 } else {
1275 builder
1276 .send()
1277 .await
1278 .context("aws_sdk_s3::client::Client upload_part() failed.")?
1279 };
1280
1281 debug!(
1282 key = &target_key,
1283 part_number = part_number,
1284 "upload_part() complete",
1285 );
1286
1287 trace!(key = &target_key, "{upload_part_output:?}");
1288
1289 if md5_digest.is_some() {
1290 let mut locked_multipart_etags = multipart_etags.lock().unwrap();
1291 #[allow(clippy::unnecessary_unwrap)]
1292 locked_multipart_etags.push(MutipartEtags {
1293 digest: md5_digest.as_ref().unwrap().as_slice().to_vec(),
1294 part_number,
1295 });
1296 }
1297 } else {
1298 let upload_part_copy_output = target
1299 .upload_part_copy()
1300 .copy_source(copy_source)
1301 .set_request_payer(request_payer)
1302 .set_copy_source_range(Some(range))
1303 .bucket(&target_bucket)
1304 .key(&target_key)
1305 .upload_id(target_upload_id.clone())
1306 .part_number(part_number)
1307 .set_copy_source_sse_customer_algorithm(source_sse_c)
1308 .set_copy_source_sse_customer_key(source_sse_c_key_string)
1309 .set_copy_source_sse_customer_key_md5(source_sse_c_key_md5)
1310 .set_sse_customer_algorithm(target_sse_c)
1311 .set_sse_customer_key(target_sse_c_key)
1312 .set_sse_customer_key_md5(target_sse_c_key_md5)
1313 .set_copy_source_if_match(copy_source_if_match.clone())
1314 .send()
1315 .await?;
1316
1317 debug!(
1318 key = &target_key,
1319 part_number = part_number,
1320 copy_source_if_match = copy_source_if_match,
1321 "upload_part_copy() complete",
1322 );
1323
1324 trace!(key = &target_key, "{upload_part_copy_output:?}");
1325
1326 let _ =
1327 stats_sender.send_blocking(SyncStatistics::SyncBytes(upload_size as u64));
1328
1329 upload_part_output =
1330 convert_copy_to_upload_part_output(upload_part_copy_output);
1331 }
1332
1333 let mut event_data = EventData::new(EventType::SYNC_WRITE);
1334 event_data.key = Some(target_key.clone());
1335 event_data.source_version_id = source_version_id.clone();
1337 event_data.source_size = Some(source_total_size as u64);
1338 event_data.upload_id = Some(target_upload_id.clone());
1339 event_data.part_number = Some(part_number as u64);
1340 event_data.byte_written = Some(upload_size as u64);
1341 event_manager.trigger_event(event_data).await;
1342
1343 let mut locked_upload_parts = upload_parts.lock().unwrap();
1344 locked_upload_parts.push(
1345 CompletedPart::builder()
1346 .e_tag(upload_part_output.e_tag().unwrap())
1347 .set_checksum_sha256(
1348 upload_part_output
1349 .checksum_sha256()
1350 .map(|digest| digest.to_string()),
1351 )
1352 .set_checksum_sha1(
1353 upload_part_output
1354 .checksum_sha1()
1355 .map(|digest| digest.to_string()),
1356 )
1357 .set_checksum_crc32(
1358 upload_part_output
1359 .checksum_crc32()
1360 .map(|digest| digest.to_string()),
1361 )
1362 .set_checksum_crc32_c(
1363 upload_part_output
1364 .checksum_crc32_c()
1365 .map(|digest| digest.to_string()),
1366 )
1367 .set_checksum_crc64_nvme(
1368 upload_part_output
1369 .checksum_crc64_nvme()
1370 .map(|digest| digest.to_string()),
1371 )
1372 .part_number(part_number)
1373 .build(),
1374 );
1375
1376 let mut upload_size_vec = total_upload_size.lock().unwrap();
1377 upload_size_vec.push(upload_size);
1378
1379 trace!(
1380 key = &target_key,
1381 upload_id = &target_upload_id,
1382 "{locked_upload_parts:?}"
1383 );
1384
1385 Ok(())
1386 });
1387
1388 upload_parts_join_handles.push(task);
1389
1390 offset += object_part_chunksize;
1391 part_number += 1;
1392 }
1393
1394 while let Some(result) = upload_parts_join_handles.next().await {
1395 result??;
1396 if self.cancellation_token.is_cancelled() {
1397 return Err(anyhow!(S3syncError::Cancelled));
1398 }
1399 }
1400
1401 let total_upload_size: i64 = shared_total_upload_size.lock().unwrap().iter().sum();
1402 if total_upload_size == self.source_total_size as i64 {
1403 debug!(
1404 key,
1405 total_upload_size, "multipart upload(auto-chunksize) completed successfully."
1406 );
1407 } else {
1408 return Err(anyhow!(format!(
1409 "multipart upload(auto-chunksize) size mismatch: key={key}, expected = {0}, actual {total_upload_size}",
1410 self.source_total_size
1411 )));
1412 }
1413
1414 let mut locked_multipart_etags = shared_multipart_etags.lock().unwrap();
1416 locked_multipart_etags.sort_by_key(|e| e.part_number);
1417 for etag in locked_multipart_etags.iter() {
1418 self.concatnated_md5_hash.append(&mut etag.digest.clone());
1419 }
1420
1421 let mut parts = shared_upload_parts.lock().unwrap().clone();
1423 parts.sort_by_key(|part| part.part_number.unwrap());
1424 Ok(parts)
1425 }
1426
1427 async fn singlepart_upload(
1429 &mut self,
1430 bucket: &str,
1431 key: &str,
1432 mut get_object_output: GetObjectOutput,
1433 ) -> Result<PutObjectOutput> {
1434 let source_sse = get_object_output.server_side_encryption().cloned();
1435 let source_remote_storage = get_object_output.e_tag().is_some();
1436 let source_e_tag = get_object_output.e_tag().map(|e_tag| e_tag.to_string());
1437 let source_local_storage = source_e_tag.is_none();
1438 let source_checksum = self.source_additional_checksum.clone();
1439 let source_storage_class = get_object_output.storage_class().cloned();
1440 let source_version_id = get_object_output.version_id().map(|v| v.to_string());
1441 let source_last_modified = get_object_output.last_modified().copied();
1442
1443 let buffer = if !self.config.server_side_copy {
1444 let mut body = get_object_output.body.into_async_read();
1445 get_object_output.body = ByteStream::from_static(b"");
1446
1447 let mut buffer = Vec::<u8>::with_capacity(self.source_total_size as usize);
1448 buffer.resize_with(self.source_total_size as usize, Default::default);
1449
1450 let result = body.read_exact(buffer.as_mut_slice()).await;
1451 if let Err(e) = result {
1452 warn!(key = &key, "Failed to read data from the body: {e:?}");
1453 return Err(anyhow!(S3syncError::DownloadForceRetryableError));
1454 }
1455
1456 buffer
1457 } else {
1458 Vec::new() };
1460
1461 let md5_digest_base64 = if !self.express_onezone_storage
1462 && !self.config.disable_content_md5_header
1463 && !self.config.server_side_copy
1464 {
1465 let md5_digest = md5::compute(&buffer);
1466
1467 self.concatnated_md5_hash
1468 .append(&mut md5_digest.as_slice().to_vec());
1469
1470 Some(general_purpose::STANDARD.encode(md5_digest.as_slice()))
1471 } else {
1472 None
1473 };
1474
1475 let buffer_stream = ByteStream::from(buffer);
1476
1477 let storage_class = if self.config.storage_class.is_none() {
1478 get_object_output.storage_class().cloned()
1479 } else {
1480 self.config.storage_class.clone()
1481 };
1482
1483 let mut upload_metadata = UploadMetadata {
1484 acl: self.config.canned_acl.clone(),
1485 cache_control: if self.config.cache_control.is_none() {
1486 get_object_output
1487 .cache_control()
1488 .map(|value| value.to_string())
1489 } else {
1490 self.config.cache_control.clone()
1491 },
1492 content_disposition: if self.config.content_disposition.is_none() {
1493 get_object_output
1494 .content_disposition()
1495 .map(|value| value.to_string())
1496 } else {
1497 self.config.content_disposition.clone()
1498 },
1499 content_encoding: if self.config.content_encoding.is_none() {
1500 get_object_output
1501 .content_encoding()
1502 .map(|value| value.to_string())
1503 } else {
1504 self.config.content_encoding.clone()
1505 },
1506 content_language: if self.config.content_language.is_none() {
1507 get_object_output
1508 .content_language()
1509 .map(|value| value.to_string())
1510 } else {
1511 self.config.content_language.clone()
1512 },
1513 content_type: if self.config.content_type.is_none() {
1514 get_object_output
1515 .content_type()
1516 .map(|value| value.to_string())
1517 } else {
1518 self.config.content_type.clone()
1519 },
1520 expires: if self.config.expires.is_none() {
1521 get_object_output.expires_string().map(|expires_string| {
1522 DateTime::from_str(expires_string, DateTimeFormat::HttpDate).unwrap()
1523 })
1524 } else {
1525 Some(DateTime::from_str(
1526 &self.config.expires.unwrap().to_rfc3339(),
1527 DateTimeFormat::DateTimeWithOffset,
1528 )?)
1529 },
1530 metadata: if self.config.metadata.is_none() {
1531 get_object_output.metadata().cloned()
1532 } else {
1533 self.config.metadata.clone()
1534 },
1535 request_payer: self.request_payer.clone(),
1536 storage_class: storage_class.clone(),
1537 website_redirect_location: if self.config.website_redirect.is_none() {
1538 get_object_output
1539 .website_redirect_location()
1540 .map(|value| value.to_string())
1541 } else {
1542 self.config.website_redirect.clone()
1543 },
1544 tagging: self.tagging.clone(),
1545 };
1546
1547 let callback_result = self
1548 .config
1549 .preprocess_manager
1550 .execute_preprocessing(key, &get_object_output, &mut upload_metadata)
1551 .await;
1552 if let Err(e) = callback_result {
1553 if is_callback_cancelled(&e) {
1554 debug!(
1555 key = key,
1556 "PreprocessCallback execute_preprocessing() cancelled. Skipping upload."
1557 );
1558 return Ok(PutObjectOutputBuilder::default().build());
1559 } else {
1560 return Err(e.context("PreprocessCallback before_upload() failed."));
1561 }
1562 }
1563
1564 let put_object_output = if self.config.server_side_copy {
1565 let copy_source = self
1566 .source
1567 .generate_copy_source_key(self.source_key.as_ref(), source_version_id.clone());
1568 let copy_object_output = self
1569 .client
1570 .copy_object()
1571 .copy_source(copy_source)
1572 .set_request_payer(upload_metadata.request_payer)
1573 .set_storage_class(upload_metadata.storage_class)
1574 .bucket(bucket)
1575 .key(key)
1576 .metadata_directive(MetadataDirective::Replace)
1577 .tagging_directive(TaggingDirective::Replace)
1578 .set_metadata(upload_metadata.metadata)
1579 .set_tagging(upload_metadata.tagging)
1580 .set_website_redirect_location(upload_metadata.website_redirect_location)
1581 .set_content_type(upload_metadata.content_type)
1582 .set_content_encoding(upload_metadata.content_encoding)
1583 .set_cache_control(upload_metadata.cache_control)
1584 .set_content_disposition(upload_metadata.content_disposition)
1585 .set_content_language(upload_metadata.content_language)
1586 .set_expires(upload_metadata.expires)
1587 .set_server_side_encryption(self.config.sse.clone())
1588 .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
1589 .set_sse_customer_algorithm(self.config.target_sse_c.clone())
1590 .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
1591 .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
1592 .set_copy_source_sse_customer_algorithm(self.config.source_sse_c.clone())
1593 .set_copy_source_sse_customer_key(self.config.source_sse_c_key.clone().key.clone())
1594 .set_copy_source_sse_customer_key_md5(self.config.source_sse_c_key_md5.clone())
1595 .set_acl(upload_metadata.acl)
1596 .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
1597 .set_copy_source_if_match(self.copy_source_if_match.clone())
1598 .set_if_none_match(self.if_none_match.clone())
1599 .send()
1600 .await?;
1601 let _ = self
1602 .stats_sender
1603 .send_blocking(SyncStatistics::SyncBytes(self.source_total_size));
1604 convert_copy_to_put_object_output(copy_object_output, self.source_total_size as i64)
1605 } else {
1606 let builder = self
1607 .client
1608 .put_object()
1609 .set_request_payer(upload_metadata.request_payer)
1610 .set_storage_class(upload_metadata.storage_class)
1611 .bucket(bucket)
1612 .key(key)
1613 .content_length(self.source_total_size as i64)
1614 .body(buffer_stream)
1615 .set_metadata(upload_metadata.metadata)
1616 .set_tagging(upload_metadata.tagging)
1617 .set_website_redirect_location(upload_metadata.website_redirect_location)
1618 .set_content_md5(md5_digest_base64)
1619 .set_content_type(upload_metadata.content_type)
1620 .set_content_encoding(upload_metadata.content_encoding)
1621 .set_cache_control(upload_metadata.cache_control)
1622 .set_content_disposition(upload_metadata.content_disposition)
1623 .set_content_language(upload_metadata.content_language)
1624 .set_expires(upload_metadata.expires)
1625 .set_server_side_encryption(self.config.sse.clone())
1626 .set_ssekms_key_id(self.config.sse_kms_key_id.clone().id.clone())
1627 .set_sse_customer_algorithm(self.config.target_sse_c.clone())
1628 .set_sse_customer_key(self.config.target_sse_c_key.clone().key.clone())
1629 .set_sse_customer_key_md5(self.config.target_sse_c_key_md5.clone())
1630 .set_acl(upload_metadata.acl)
1631 .set_checksum_algorithm(self.config.additional_checksum_algorithm.as_ref().cloned())
1632 .set_if_match(self.if_match.clone())
1633 .set_if_none_match(self.if_none_match.clone());
1634
1635 if self.config.disable_payload_signing {
1636 builder
1637 .customize()
1638 .disable_payload_signing()
1639 .send()
1640 .await
1641 .context("aws_sdk_s3::client::Client put_object() failed.")?
1642 } else {
1643 builder
1644 .send()
1645 .await
1646 .context("aws_sdk_s3::client::Client put_object() failed.")?
1647 }
1648 };
1649
1650 debug!(
1651 key = &key,
1652 if_match = &self.if_match.clone(),
1653 if_none_match = &self.if_none_match.clone(),
1654 copy_source_if_match = self.copy_source_if_match.clone(),
1655 "put_object() complete",
1656 );
1657
1658 let mut event_data = EventData::new(EventType::SYNC_WRITE);
1659 event_data.key = Some(key.to_string());
1660 event_data.source_version_id = source_version_id.clone();
1662 event_data.source_size = Some(self.source_total_size);
1663 event_data.byte_written = Some(self.source_total_size); self.config.event_manager.trigger_event(event_data).await;
1665
1666 let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
1667 event_data.key = Some(key.to_string());
1668 event_data.source_version_id = source_version_id.clone();
1670 event_data.target_version_id = put_object_output.version_id().map(|v| v.to_string());
1671 event_data.source_etag = source_e_tag.clone();
1673 event_data.source_last_modified = get_object_output.last_modified().copied();
1674 event_data.source_size = Some(self.source_total_size);
1675 event_data.target_size = Some(self.source_total_size); self.config.event_manager.trigger_event(event_data).await;
1677
1678 let source_e_tag = if source_local_storage {
1679 Some(self.generate_e_tag_hash(0))
1680 } else {
1681 source_e_tag
1682 };
1683
1684 let mut target_e_tag = None;
1685 if !self.config.disable_etag_verify
1686 && !self.express_onezone_storage
1687 && !self.config.disable_content_md5_header
1688 && source_storage_class != Some(StorageClass::ExpressOnezone)
1689 {
1690 let target_sse = put_object_output.server_side_encryption().cloned();
1691 target_e_tag = put_object_output.e_tag().map(|e| e.to_string());
1692
1693 self.verify_e_tag(
1694 key,
1695 &source_sse,
1696 source_remote_storage,
1697 &source_e_tag,
1698 &target_sse,
1699 &target_e_tag,
1700 source_version_id.clone(),
1701 put_object_output.version_id().map(|v| v.to_string()),
1702 source_last_modified,
1703 self.source_total_size,
1704 self.source_total_size, )
1706 .await;
1707 }
1708
1709 if !self.config.disable_additional_checksum_verify {
1710 let target_checksum = get_additional_checksum_from_put_object_result(
1711 &put_object_output,
1712 self.config.additional_checksum_algorithm.as_ref().cloned(),
1713 );
1714
1715 self.validate_checksum(
1716 key,
1717 source_checksum,
1718 target_checksum,
1719 &source_e_tag,
1720 &target_e_tag,
1721 source_remote_storage,
1722 source_version_id,
1723 put_object_output.version_id().map(|v| v.to_string()),
1724 source_last_modified,
1725 self.source_total_size,
1726 self.source_total_size, )
1728 .await;
1729 }
1730
1731 Ok(put_object_output)
1732 }
1733
1734 #[allow(clippy::too_many_arguments)]
1735 async fn validate_checksum(
1736 &mut self,
1737 key: &str,
1738 source_checksum: Option<String>,
1739 target_checksum: Option<String>,
1740 source_e_tag: &Option<String>,
1741 target_e_tag: &Option<String>,
1742 source_remote_storage: bool,
1743 source_version_id: Option<String>,
1744 target_version_id: Option<String>,
1745 source_last_modified: Option<DateTime>,
1746 source_content_length: u64,
1747 target_content_length: u64,
1748 ) {
1749 if self.config.additional_checksum_mode.is_some() && source_checksum.is_none() {
1750 self.send_stats(SyncWarning {
1751 key: key.to_string(),
1752 })
1753 .await;
1754 self.has_warning.store(true, Ordering::SeqCst);
1755
1756 let message = "additional checksum algorithm is different from the target storage. skip additional checksum verification.";
1757 warn!(key = &key, message);
1758
1759 let mut event_data = EventData::new(EventType::SYNC_WARNING);
1760 event_data.key = Some(key.to_string());
1761 event_data.source_version_id = source_version_id.clone();
1763 event_data.target_version_id = target_version_id.clone();
1765 event_data.source_last_modified = source_last_modified;
1766 event_data.source_etag = source_e_tag.clone();
1768 event_data.source_size = Some(source_content_length);
1769 event_data.target_etag = target_e_tag.clone();
1771 event_data.target_size = Some(target_content_length);
1772 event_data.message = Some(message.to_string());
1773 self.config.event_manager.trigger_event(event_data).await;
1774 }
1775
1776 #[allow(clippy::unnecessary_unwrap)]
1777 if target_checksum.is_some() && source_checksum.is_some() {
1778 let target_checksum = target_checksum.unwrap();
1779 let source_checksum = source_checksum.unwrap();
1780
1781 let additional_checksum_algorithm = self
1782 .config
1783 .additional_checksum_algorithm
1784 .as_ref()
1785 .unwrap()
1786 .as_str();
1787
1788 let mut event_data = EventData::new(EventType::UNDEFINED);
1789 event_data.key = Some(key.to_string());
1790 event_data.source_version_id = source_version_id;
1791 event_data.target_version_id = target_version_id;
1792 event_data.source_last_modified = source_last_modified;
1793 event_data.source_etag = source_e_tag.clone();
1795 event_data.source_size = Some(source_content_length);
1796 event_data.target_etag = target_e_tag.clone();
1798 event_data.target_size = Some(target_content_length);
1799 event_data.source_checksum = Some(source_checksum.clone());
1800 event_data.target_checksum = Some(target_checksum.clone());
1801 event_data.checksum_algorithm =
1802 Some(self.config.additional_checksum_algorithm.clone().unwrap());
1803
1804 if target_checksum != source_checksum {
1805 if source_remote_storage
1806 && is_multipart_upload_e_tag(source_e_tag)
1807 && self.config.disable_multipart_verify
1808 {
1809 debug!(
1810 key = &key,
1811 additional_checksum_algorithm = additional_checksum_algorithm,
1812 target_checksum = target_checksum,
1813 source_checksum = source_checksum,
1814 "skip additional checksum verification."
1815 );
1816 } else {
1817 self.send_stats(SyncWarning {
1818 key: key.to_string(),
1819 })
1820 .await;
1821 self.has_warning.store(true, Ordering::SeqCst);
1822
1823 let message = if source_remote_storage
1824 && is_multipart_upload_e_tag(source_e_tag)
1825 && !self.is_auto_chunksize_enabled()
1826 {
1827 format!("{} {}", "additional checksum", MISMATCH_WARNING_WITH_HELP)
1828 } else {
1829 "additional checksum mismatch. file in the target storage may be corrupted."
1830 .to_string()
1831 };
1832
1833 event_data.event_type = EventType::SYNC_CHECKSUM_MISMATCH;
1834 self.config.event_manager.trigger_event(event_data).await;
1835
1836 warn!(
1837 key = &key,
1838 additional_checksum_algorithm = additional_checksum_algorithm,
1839 target_checksum = target_checksum,
1840 source_checksum = source_checksum,
1841 message
1842 );
1843 }
1844 } else {
1845 self.send_stats(ChecksumVerified {
1846 key: key.to_string(),
1847 })
1848 .await;
1849
1850 event_data.event_type = EventType::SYNC_CHECKSUM_VERIFIED;
1851 self.config.event_manager.trigger_event(event_data).await;
1852
1853 debug!(
1854 key = &key,
1855 additional_checksum_algorithm = additional_checksum_algorithm,
1856 target_checksum = target_checksum,
1857 source_checksum = source_checksum,
1858 "additional checksum verified.",
1859 );
1860 }
1861 }
1862 }
1863
1864 fn generate_e_tag_hash(&self, parts_count: i64) -> String {
1865 generate_e_tag_hash(&self.concatnated_md5_hash, parts_count)
1866 }
1867
1868 fn calculate_parts_count(&self, content_length: i64) -> i64 {
1869 calculate_parts_count(
1870 self.config.transfer_config.multipart_threshold as i64,
1871 self.config.transfer_config.multipart_chunksize as i64,
1872 content_length,
1873 )
1874 }
1875
1876 async fn send_stats(&self, stats: SyncStatistics) {
1877 let _ = self.stats_sender.send(stats).await;
1878 }
1879
1880 fn is_auto_chunksize_enabled(&self) -> bool {
1881 self.config.transfer_config.auto_chunksize && self.object_parts.is_some()
1882 }
1883}
1884
1885fn calculate_parts_count(
1886 multipart_threshold: i64,
1887 multipart_chunksize: i64,
1888 content_length: i64,
1889) -> i64 {
1890 if content_length < multipart_threshold {
1891 return 0;
1892 }
1893
1894 if content_length % multipart_chunksize == 0 {
1895 return content_length / multipart_chunksize;
1896 }
1897
1898 (content_length / multipart_chunksize) + 1
1899}
1900
1901pub fn get_additional_checksum_from_put_object_result(
1902 put_object_output: &PutObjectOutput,
1903 checksum_algorithm: Option<ChecksumAlgorithm>,
1904) -> Option<String> {
1905 checksum_algorithm.as_ref()?;
1906
1907 match checksum_algorithm.unwrap() {
1908 ChecksumAlgorithm::Sha256 => put_object_output
1909 .checksum_sha256()
1910 .map(|checksum| checksum.to_string()),
1911 ChecksumAlgorithm::Sha1 => put_object_output
1912 .checksum_sha1()
1913 .map(|checksum| checksum.to_string()),
1914 ChecksumAlgorithm::Crc32 => put_object_output
1915 .checksum_crc32()
1916 .map(|checksum| checksum.to_string()),
1917 ChecksumAlgorithm::Crc32C => put_object_output
1918 .checksum_crc32_c()
1919 .map(|checksum| checksum.to_string()),
1920 ChecksumAlgorithm::Crc64Nvme => put_object_output
1921 .checksum_crc64_nvme()
1922 .map(|checksum| checksum.to_string()),
1923 _ => {
1924 panic!("unknown algorithm")
1925 }
1926 }
1927}
1928
1929pub fn get_additional_checksum_from_multipart_upload_result(
1930 complete_multipart_upload_result: &CompleteMultipartUploadOutput,
1931 checksum_algorithm: Option<ChecksumAlgorithm>,
1932) -> Option<String> {
1933 checksum_algorithm.as_ref()?;
1934
1935 match checksum_algorithm.unwrap() {
1936 ChecksumAlgorithm::Sha256 => complete_multipart_upload_result
1937 .checksum_sha256()
1938 .map(|checksum| checksum.to_string()),
1939 ChecksumAlgorithm::Sha1 => complete_multipart_upload_result
1940 .checksum_sha1()
1941 .map(|checksum| checksum.to_string()),
1942 ChecksumAlgorithm::Crc32 => complete_multipart_upload_result
1943 .checksum_crc32()
1944 .map(|checksum| checksum.to_string()),
1945 ChecksumAlgorithm::Crc32C => complete_multipart_upload_result
1946 .checksum_crc32_c()
1947 .map(|checksum| checksum.to_string()),
1948 ChecksumAlgorithm::Crc64Nvme => complete_multipart_upload_result
1949 .checksum_crc64_nvme()
1950 .map(|checksum| checksum.to_string()),
1951 _ => {
1952 panic!("unknown algorithm")
1953 }
1954 }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959 use super::*;
1960 use aws_sdk_s3::primitives::DateTime;
1961 use tracing_subscriber::EnvFilter;
1962
1963 #[test]
1964 fn update_versioning_metadata_with_new() {
1965 init_dummy_tracing_subscriber();
1966
1967 let mut get_object_output = GetObjectOutput::builder()
1968 .last_modified(DateTime::from_secs(0))
1969 .version_id("version1")
1970 .build();
1971
1972 get_object_output = UploadManager::update_versioning_metadata(get_object_output);
1973 assert_eq!(
1974 get_object_output
1975 .metadata()
1976 .as_ref()
1977 .unwrap()
1978 .get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
1979 .unwrap(),
1980 "version1"
1981 );
1982 assert_eq!(
1983 get_object_output
1984 .metadata()
1985 .as_ref()
1986 .unwrap()
1987 .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
1988 .unwrap(),
1989 "1970-01-01T00:00:00+00:00"
1990 );
1991 }
1992
1993 #[test]
1994 fn update_versioning_metadata_with_update() {
1995 init_dummy_tracing_subscriber();
1996
1997 let mut get_object_output = GetObjectOutput::builder()
1998 .last_modified(DateTime::from_secs(0))
1999 .version_id("version1")
2000 .metadata("mykey", "myvalue")
2001 .build();
2002
2003 get_object_output = UploadManager::update_versioning_metadata(get_object_output);
2004 assert_eq!(
2005 get_object_output
2006 .metadata()
2007 .as_ref()
2008 .unwrap()
2009 .get("mykey")
2010 .unwrap(),
2011 "myvalue"
2012 );
2013 assert_eq!(
2014 get_object_output
2015 .metadata()
2016 .as_ref()
2017 .unwrap()
2018 .get(S3SYNC_ORIGIN_VERSION_ID_METADATA_KEY)
2019 .unwrap(),
2020 "version1"
2021 );
2022 assert_eq!(
2023 get_object_output
2024 .metadata()
2025 .as_ref()
2026 .unwrap()
2027 .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2028 .unwrap(),
2029 "1970-01-01T00:00:00+00:00"
2030 );
2031 }
2032
2033 #[test]
2034 fn update_versioning_metadata_without_version_id() {
2035 init_dummy_tracing_subscriber();
2036
2037 let mut get_object_output = GetObjectOutput::builder()
2038 .last_modified(DateTime::from_secs(0))
2039 .metadata("mykey", "myvalue")
2040 .build();
2041
2042 get_object_output = UploadManager::update_versioning_metadata(get_object_output);
2043 assert_eq!(get_object_output.metadata().as_ref().unwrap().len(), 1);
2044 assert_eq!(
2045 get_object_output.metadata().unwrap().get("mykey").unwrap(),
2046 "myvalue"
2047 );
2048 }
2049
2050 #[test]
2051 fn modify_last_modified_metadata_with_new() {
2052 init_dummy_tracing_subscriber();
2053
2054 let mut get_object_output = GetObjectOutput::builder()
2055 .last_modified(DateTime::from_secs(0))
2056 .build();
2057 get_object_output = UploadManager::modify_last_modified_metadata(get_object_output);
2058
2059 assert_eq!(
2060 get_object_output
2061 .metadata()
2062 .unwrap()
2063 .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2064 .unwrap(),
2065 "1970-01-01T00:00:00+00:00"
2066 );
2067 }
2068
2069 #[test]
2070 fn modify_last_modified_metadata_with_update() {
2071 init_dummy_tracing_subscriber();
2072
2073 let mut get_object_output = GetObjectOutput::builder()
2074 .last_modified(DateTime::from_secs(0))
2075 .metadata("key1", "value1")
2076 .build();
2077 get_object_output = UploadManager::modify_last_modified_metadata(get_object_output);
2078
2079 assert_eq!(
2080 get_object_output
2081 .metadata()
2082 .unwrap()
2083 .get(S3SYNC_ORIGIN_LAST_MODIFIED_METADATA_KEY)
2084 .unwrap(),
2085 "1970-01-01T00:00:00+00:00"
2086 );
2087 assert_eq!(
2088 get_object_output.metadata().unwrap().get("key1").unwrap(),
2089 "value1"
2090 );
2091 }
2092
2093 #[test]
2094 fn calculate_parts_count_test() {
2095 init_dummy_tracing_subscriber();
2096
2097 assert_eq!(
2098 calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, 8 * 1024 * 1024),
2099 1
2100 );
2101
2102 assert_eq!(
2103 calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, (8 * 1024 * 1024) - 1),
2104 0
2105 );
2106
2107 assert_eq!(
2108 calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, 16 * 1024 * 1024),
2109 2
2110 );
2111
2112 assert_eq!(
2113 calculate_parts_count(8 * 1024 * 1024, 8 * 1024 * 1024, (16 * 1024 * 1024) + 1),
2114 3
2115 );
2116 }
2117
2118 fn init_dummy_tracing_subscriber() {
2119 let _ = tracing_subscriber::fmt()
2120 .with_env_filter(
2121 EnvFilter::try_from_default_env()
2122 .or_else(|_| EnvFilter::try_new("dummy=trace"))
2123 .unwrap(),
2124 )
2125 .try_init();
2126 }
2127}