1use anyhow::{Context, Result, anyhow};
2use async_channel::Sender;
3use async_trait::async_trait;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::operation::delete_object::DeleteObjectOutput;
6use aws_sdk_s3::operation::delete_object_tagging::DeleteObjectTaggingOutput;
7use aws_sdk_s3::operation::get_object::GetObjectOutput;
8use aws_sdk_s3::operation::get_object_tagging::GetObjectTaggingOutput;
9use aws_sdk_s3::operation::head_object::HeadObjectOutput;
10use aws_sdk_s3::operation::put_object::PutObjectOutput;
11use aws_sdk_s3::operation::put_object_tagging::PutObjectTaggingOutput;
12use aws_sdk_s3::types::builders::ObjectPartBuilder;
13use aws_sdk_s3::types::{
14 BucketVersioningStatus, ChecksumMode, DeleteMarkerEntry, ObjectAttributes, ObjectPart,
15 ObjectVersion, RequestPayer, Tagging,
16};
17use aws_smithy_types_convert::date_time::DateTimeExt;
18use leaky_bucket::RateLimiter;
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::future::Future;
22use std::path::PathBuf;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use tokio::sync::Semaphore;
27use tokio::task::JoinSet;
28use tracing::{debug, error, info};
29
30use crate::Config;
31use crate::config::ClientConfig;
32use crate::storage::checksum::AdditionalChecksum;
33use crate::storage::s3::upload_manager::UploadManager;
34use crate::storage::{
35 Storage, StorageFactory, StorageTrait, convert_head_to_get_object_output,
36 convert_to_buf_byte_stream_with_callback,
37};
38use crate::types::SyncStatistics::{SyncBytes, SyncSkip};
39use crate::types::event_callback::{EventData, EventType};
40use crate::types::token::PipelineCancellationToken;
41use crate::types::{
42 ObjectChecksum, ObjectVersions, S3syncObject, SseCustomerKey, StoragePath, SyncStatistics,
43 clone_object_version_with_key, get_additional_checksum, is_full_object_checksum,
44};
45
46const EXPRESS_ONEZONE_STORAGE_SUFFIX: &str = "--x-s3";
47
48mod client_builder;
49mod upload_manager;
50
51pub struct S3StorageFactory {}
52
53#[async_trait]
54impl StorageFactory for S3StorageFactory {
55 async fn create(
56 config: Config,
57 path: StoragePath,
58 cancellation_token: PipelineCancellationToken,
59 stats_sender: Sender<SyncStatistics>,
60 client_config: Option<ClientConfig>,
61 request_payer: Option<RequestPayer>,
62 rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
63 rate_limit_bandwidth: Option<Arc<RateLimiter>>,
64 has_warning: Arc<AtomicBool>,
65 _object_to_list: Option<String>,
66 ) -> Storage {
67 S3Storage::boxed_new(
68 config,
69 path,
70 cancellation_token,
71 stats_sender,
72 Some(Arc::new(
73 client_config.as_ref().unwrap().create_client().await,
74 )),
75 request_payer,
76 rate_limit_objects_per_sec,
77 rate_limit_bandwidth,
78 has_warning,
79 )
80 .await
81 }
82}
83
84#[derive(Clone)]
85struct S3Storage {
86 config: Config,
87 bucket: String,
88 prefix: String,
89 cancellation_token: PipelineCancellationToken,
90 client: Option<Arc<Client>>,
91 request_payer: Option<RequestPayer>,
92 stats_sender: Sender<SyncStatistics>,
93 rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
94 rate_limit_bandwidth: Option<Arc<RateLimiter>>,
95 has_warning: Arc<AtomicBool>,
96 listing_worker_semaphore: Arc<Semaphore>,
97}
98
99impl S3Storage {
100 #[allow(clippy::too_many_arguments)]
101 async fn boxed_new(
102 config: Config,
103 path: StoragePath,
104 cancellation_token: PipelineCancellationToken,
105 stats_sender: Sender<SyncStatistics>,
106 client: Option<Arc<Client>>,
107 request_payer: Option<RequestPayer>,
108 rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
109 rate_limit_bandwidth: Option<Arc<RateLimiter>>,
110 has_warning: Arc<AtomicBool>,
111 ) -> Storage {
112 let (bucket, prefix) = if let StoragePath::S3 { bucket, prefix } = path {
113 (bucket, prefix)
114 } else {
115 panic!("s3 path not found")
116 };
117
118 let max_parallel_listings: usize = config.max_parallel_listings as usize;
119 let storage = S3Storage {
120 config,
121 bucket,
122 prefix,
123 cancellation_token,
124 client,
125 request_payer,
126 stats_sender,
127 rate_limit_objects_per_sec,
128 rate_limit_bandwidth,
129 has_warning,
130 listing_worker_semaphore: Arc::new(Semaphore::new(max_parallel_listings)),
131 };
132
133 Box::new(storage)
134 }
135
136 async fn aggregate_delete_markers(
137 &self,
138 delete_marker_entries: &[DeleteMarkerEntry],
139 s3sync_object_map: &mut HashMap<String, Vec<S3syncObject>>,
140 ) {
141 for delete_marker in delete_marker_entries {
142 if self.config.point_in_time.is_none() && !delete_marker.is_latest().unwrap() {
144 continue;
145 }
146
147 let key_without_prefix = remove_s3_prefix(delete_marker.key().unwrap(), &self.prefix);
148 if key_without_prefix.is_empty() {
149 let mut event_data = EventData::new(EventType::SYNC_FILTERED);
150 event_data.key = Some(delete_marker.key().unwrap().to_string());
151 event_data.source_version_id = delete_marker.version_id.clone();
153 event_data.message = Some("Key that is same as prefix is skipped.".to_string());
154 self.config.event_manager.trigger_event(event_data).await;
155
156 self.send_stats(SyncSkip {
157 key: delete_marker.key().unwrap().to_string(),
158 })
159 .await;
160
161 let key = delete_marker.key().unwrap();
162 debug!(key = key, "Key that is same as prefix is skipped.");
163
164 continue;
165 }
166
167 let delete_marker_object =
168 S3syncObject::clone_delete_marker_with_key(delete_marker, &key_without_prefix);
169
170 if s3sync_object_map.get_mut(&key_without_prefix).is_none() {
171 s3sync_object_map.insert(key_without_prefix.to_string(), ObjectVersions::new());
172 }
173 s3sync_object_map
174 .get_mut(&key_without_prefix)
175 .unwrap()
176 .push(delete_marker_object);
177 }
178 }
179
180 async fn aggregate_object_versions_and_send(
181 &self,
182 sender: &Sender<S3syncObject>,
183 object_versions: &[ObjectVersion],
184 s3sync_object_map: &mut HashMap<String, ObjectVersions>,
185 ) -> Result<()> {
186 let mut previous_key = "".to_string();
187 for object in object_versions {
188 let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
189 if key_without_prefix.is_empty() {
190 let mut event_data = EventData::new(EventType::SYNC_FILTERED);
191 event_data.key = object.key().map(|k| k.to_string());
192 event_data.source_version_id = object.version_id.clone();
194 event_data.message = Some("Key that is same as prefix is skipped.".to_string());
195 self.config.event_manager.trigger_event(event_data).await;
196
197 self.send_stats(SyncSkip {
198 key: object.key().unwrap().to_string(),
199 })
200 .await;
201
202 let key = object.key().unwrap();
203 debug!(key = key, "Key that is same as prefix is skipped.");
204
205 continue;
206 }
207
208 if !previous_key.is_empty() && previous_key != key_without_prefix {
209 Self::send_object_versions_with_sort(
210 sender,
211 &mut s3sync_object_map.remove(&previous_key).unwrap(),
212 )
213 .await?;
214 }
215
216 let versioning_object =
217 S3syncObject::clone_versioning_object_with_key(object, &key_without_prefix);
218
219 if s3sync_object_map.get(&key_without_prefix).is_none() {
220 s3sync_object_map.insert(key_without_prefix.to_string(), ObjectVersions::new());
221 }
222 s3sync_object_map
223 .get_mut(&key_without_prefix)
224 .unwrap()
225 .push(versioning_object);
226
227 previous_key = key_without_prefix;
228 }
229
230 Ok(())
231 }
232
233 async fn send_object_versions_with_sort(
234 sender: &Sender<S3syncObject>,
235 object_versions: &mut ObjectVersions,
236 ) -> Result<()> {
237 object_versions.sort_by(|a, b| {
238 if a.is_latest() {
239 return Ordering::Greater;
240 }
241 if b.is_latest() {
242 return Ordering::Less;
243 }
244
245 a.last_modified()
246 .as_nanos()
247 .cmp(&b.last_modified().as_nanos())
248 });
249
250 for object in object_versions {
251 debug!(
252 key = object.key(),
253 "list_object_versions(): sending remote object."
254 );
255 if let Err(e) = sender
256 .send(object.clone())
257 .await
258 .context("async_channel::Sender::send() failed.")
259 {
260 return if !sender.is_closed() { Err(e) } else { Ok(()) };
261 }
262 }
263
264 Ok(())
265 }
266
267 async fn exec_rate_limit_objects_per_sec(&self) {
268 if self.rate_limit_objects_per_sec.is_some() {
269 self.rate_limit_objects_per_sec
270 .as_ref()
271 .unwrap()
272 .acquire(1)
273 .await;
274 }
275 }
276
277 fn list_objects_with_parallel<'a>(
278 &'a self,
279 prefix: &'a str,
280 sender: &'a Sender<S3syncObject>,
281 max_keys: i32,
282 current_depth: usize,
283 permit: tokio::sync::OwnedSemaphorePermit,
284 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
285 Box::pin(async move {
286 let is_root_prefix = prefix.is_empty();
287 let prefix = if is_root_prefix {
288 self.prefix.clone()
289 } else {
290 prefix.to_string()
291 };
292
293 let delimiter = if current_depth <= self.config.max_parallel_listing_max_depth as usize
295 {
296 Some("/".to_string())
297 } else {
298 None
299 };
300
301 let mut current_permit = Some(permit);
302
303 debug!(
304 root_prefix = self.prefix,
305 prefix = prefix.as_str(),
306 "Start listing objects."
307 );
308
309 let mut continuation_token = None;
310 loop {
311 self.exec_rate_limit_objects_per_sec().await;
313
314 let list_object_v2 = self
315 .client
316 .as_ref()
317 .unwrap()
318 .list_objects_v2()
319 .set_request_payer(self.request_payer.clone())
320 .bucket(&self.bucket)
321 .prefix(&prefix)
322 .set_delimiter(delimiter.clone())
323 .set_continuation_token(continuation_token)
324 .max_keys(max_keys);
325
326 if self.cancellation_token.is_cancelled() {
327 debug!("list_objects() canceled.");
328 break;
329 }
330
331 let list_objects_output = list_object_v2.send().await?;
332
333 for object in list_objects_output.contents() {
334 let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
335 if key_without_prefix.is_empty() {
336 let mut event_data = EventData::new(EventType::SYNC_FILTERED);
337 event_data.key = object.key().map(|k| k.to_string());
338 event_data.message =
340 Some("Key that is same as prefix is skipped.".to_string());
341 self.config.event_manager.trigger_event(event_data).await;
342
343 self.send_stats(SyncSkip {
344 key: object.key().unwrap().to_string(),
345 })
346 .await;
347
348 let key = object.key().unwrap();
349 debug!(key = key, "Key that is same as prefix is skipped.");
350
351 continue;
352 }
353
354 let non_versioning_object = S3syncObject::clone_non_versioning_object_with_key(
355 object,
356 &key_without_prefix,
357 );
358
359 debug!(key = object.key(), "list_objects(): sending remote object.");
360 if let Err(e) = sender
361 .send(non_versioning_object.clone())
362 .await
363 .context("async_channel::Sender::send() failed.")
364 {
365 error!("Failed to send object: {}", e);
366 return if !sender.is_closed() { Err(e) } else { Ok(()) };
367 }
368 }
369
370 if let Some(common_prefixes) = list_objects_output.common_prefixes.clone() {
371 let mut join_set = JoinSet::new();
372 for common_prefix in common_prefixes {
373 if self.cancellation_token.is_cancelled() {
374 debug!("list_objects() canceled.");
375 break;
376 }
377
378 if let Some(sub_prefix) = common_prefix.prefix() {
379 let storage = self.clone();
380 let sub_prefix = sub_prefix.to_string();
381 let sender = sender.clone();
382
383 debug!(
384 root_prefix = self.prefix,
385 prefix = prefix.as_str(),
386 sub_prefix = sub_prefix.as_str(),
387 "Start listing objects in sub-prefix."
388 );
389
390 if let Some(permit) = current_permit {
391 drop(permit);
392 current_permit = None;
393 }
394
395 let new_permit = self
396 .listing_worker_semaphore
397 .clone()
398 .acquire_owned()
399 .await
400 .unwrap();
401
402 join_set.spawn(async move {
403 storage
404 .list_objects_with_parallel(
405 &sub_prefix,
406 &sender,
407 max_keys,
408 current_depth + 1,
409 new_permit,
410 )
411 .await
412 .context("Failed to list objects in sub-prefix.")
413 });
414 }
415 }
416
417 while let Some(join_result) = join_set.join_next().await {
418 if let Err(join_error) = join_result {
419 error!("Failed to join in sub-prefix: {}", join_error);
420 self.cancellation_token.cancel();
421 return Err(anyhow!(join_error));
422 }
423
424 if let Err(task_error) = join_result.unwrap() {
425 error!("Failed to list objects in sub-prefix: {}", task_error);
426 self.cancellation_token.cancel();
427 return Err(anyhow!(task_error));
428 }
429 }
430 }
431
432 if !list_objects_output.is_truncated().unwrap() {
433 break;
434 }
435
436 continuation_token = list_objects_output
437 .next_continuation_token()
438 .map(|token| token.to_string());
439 }
440
441 if let Some(permit) = current_permit {
442 drop(permit);
443 }
444
445 Ok(())
446 })
447 }
448}
449
450#[async_trait]
451impl StorageTrait for S3Storage {
452 fn is_local_storage(&self) -> bool {
453 false
454 }
455
456 fn is_express_onezone_storage(&self) -> bool {
457 is_express_onezone_storage(&self.bucket)
458 }
459
460 async fn list_objects(
461 &self,
462 sender: &Sender<S3syncObject>,
463 max_keys: i32,
464 _warn_as_error: bool,
465 ) -> Result<()> {
466 if self.config.max_parallel_listings > 1 {
467 if !self.is_express_onezone_storage() {
468 debug!(
469 "Using parallel listing with {} workers.",
470 self.config.max_parallel_listings
471 );
472 let permit = self
473 .listing_worker_semaphore
474 .clone()
475 .acquire_owned()
476 .await
477 .unwrap();
478
479 return self
480 .list_objects_with_parallel("", sender, max_keys, 1, permit)
481 .await
482 .context("Failed to parallel object listing.");
483 } else if self.config.allow_parallel_listings_in_express_one_zone {
484 debug!(
485 "Using parallel listing with {} workers(express one zone).",
486 self.config.max_parallel_listings
487 );
488
489 let permit = self
490 .listing_worker_semaphore
491 .clone()
492 .acquire_owned()
493 .await
494 .unwrap();
495
496 return self
497 .list_objects_with_parallel("", sender, max_keys, 1, permit)
498 .await
499 .context("Failed to parallel object listing.");
500 }
501 }
502
503 debug!("Disabled parallel listing.");
504
505 let mut continuation_token = None;
506 loop {
507 let list_object_v2 = self
508 .client
509 .as_ref()
510 .unwrap()
511 .list_objects_v2()
512 .set_request_payer(self.request_payer.clone())
513 .bucket(&self.bucket)
514 .prefix(&self.prefix)
515 .set_continuation_token(continuation_token)
516 .max_keys(max_keys);
517
518 if self.cancellation_token.is_cancelled() {
519 debug!("list_objects() canceled.");
520 break;
521 }
522
523 let list_objects_output = list_object_v2
524 .send()
525 .await
526 .context("aws_sdk_s3::client::list_objects_v2() failed.")?;
527
528 for object in list_objects_output.contents() {
529 let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
530 if key_without_prefix.is_empty() {
531 let mut event_data = EventData::new(EventType::SYNC_FILTERED);
532 event_data.key = object.key().map(|k| k.to_string());
533 event_data.message = Some("Key that is same as prefix is skipped.".to_string());
535 self.config.event_manager.trigger_event(event_data).await;
536
537 self.send_stats(SyncSkip {
538 key: object.key().unwrap().to_string(),
539 })
540 .await;
541
542 let key = object.key().unwrap();
543 debug!(key = key, "Key that is same as prefix is skipped.");
544
545 continue;
546 }
547
548 let non_versioning_object =
549 S3syncObject::clone_non_versioning_object_with_key(object, &key_without_prefix);
550
551 debug!(key = object.key(), "list_objects(): sending remote object.");
552 if let Err(e) = sender
553 .send(non_versioning_object.clone())
554 .await
555 .context("async_channel::Sender::send() failed.")
556 {
557 return if !sender.is_closed() { Err(e) } else { Ok(()) };
558 }
559 }
560
561 if !list_objects_output.is_truncated().unwrap() {
562 break;
563 }
564
565 continuation_token = list_objects_output
566 .next_continuation_token()
567 .map(|token| token.to_string());
568 }
569
570 Ok(())
571 }
572
573 async fn list_object_versions(
574 &self,
575 sender: &Sender<S3syncObject>,
576 max_keys: i32,
577 _warn_as_error: bool,
578 ) -> Result<()> {
579 let mut key_marker = None;
580 let mut version_id_marker = None;
581
582 let mut s3sync_versioning_map = HashMap::new();
583
584 loop {
585 let list_object_versions = self
586 .client
587 .as_ref()
588 .unwrap()
589 .list_object_versions()
590 .set_request_payer(self.request_payer.clone())
591 .bucket(&self.bucket)
592 .prefix(&self.prefix)
593 .set_key_marker(key_marker)
594 .set_version_id_marker(version_id_marker)
595 .max_keys(max_keys);
596
597 if self.cancellation_token.is_cancelled() {
598 debug!("list_object_versions() canceled.");
599 break;
600 }
601
602 let list_object_versions_output = list_object_versions
603 .send()
604 .await
605 .context("aws_sdk_s3::client::list_object_versions() failed.")?;
606
607 self.aggregate_delete_markers(
608 list_object_versions_output.delete_markers(),
609 &mut s3sync_versioning_map,
610 )
611 .await;
612
613 self.aggregate_object_versions_and_send(
614 sender,
615 list_object_versions_output.versions(),
616 &mut s3sync_versioning_map,
617 )
618 .await?;
619
620 if !list_object_versions_output.is_truncated().unwrap() {
621 break;
622 }
623
624 key_marker = list_object_versions_output
625 .next_key_marker()
626 .map(|marker| marker.to_string());
627 version_id_marker = list_object_versions_output
628 .next_version_id_marker()
629 .map(|marker| marker.to_string());
630 }
631
632 for versioning_objects in s3sync_versioning_map.values_mut() {
634 Self::send_object_versions_with_sort(sender, versioning_objects).await?;
635 }
636
637 Ok(())
638 }
639
640 async fn get_object(
641 &self,
642 key: &str,
643 version_id: Option<String>,
644 checksum_mode: Option<ChecksumMode>,
645 range: Option<String>,
646 sse_c: Option<String>,
647 sse_c_key: SseCustomerKey,
648 sse_c_key_md5: Option<String>,
649 ) -> Result<GetObjectOutput> {
650 if self.config.dry_run {
651 let head_object_result = self
652 .client
653 .as_ref()
654 .unwrap()
655 .head_object()
656 .set_request_payer(self.request_payer.clone())
657 .bucket(&self.bucket)
658 .key(generate_full_key(&self.prefix, key))
659 .set_version_id(version_id)
660 .set_checksum_mode(checksum_mode)
661 .set_range(range)
662 .set_sse_customer_algorithm(sse_c)
663 .set_sse_customer_key(sse_c_key.key.clone())
664 .set_sse_customer_key_md5(sse_c_key_md5)
665 .send()
666 .await
667 .context("aws_sdk_s3::client::head_object() failed.")?;
668
669 return Ok(convert_head_to_get_object_output(head_object_result));
670 }
671
672 let result = self
673 .client
674 .as_ref()
675 .unwrap()
676 .get_object()
677 .set_request_payer(self.request_payer.clone())
678 .bucket(&self.bucket)
679 .key(generate_full_key(&self.prefix, key))
680 .set_version_id(version_id)
681 .set_checksum_mode(checksum_mode)
682 .set_range(range)
683 .set_sse_customer_algorithm(sse_c)
684 .set_sse_customer_key(sse_c_key.key.clone())
685 .set_sse_customer_key_md5(sse_c_key_md5)
686 .send()
687 .await
688 .context("aws_sdk_s3::client::get_object() failed.")?;
689
690 Ok(result)
691 }
692
693 async fn get_object_versions(&self, key: &str, max_keys: i32) -> Result<Vec<ObjectVersion>> {
694 let mut key_marker = None;
695 let mut version_id_marker = None;
696
697 let mut object_versions = Vec::new();
698
699 let key = generate_full_key(&self.prefix, key);
700 let key_without_prefix = remove_s3_prefix(&key, &self.prefix);
701
702 loop {
703 let list_object_versions = self
704 .client
705 .as_ref()
706 .unwrap()
707 .list_object_versions()
708 .set_request_payer(self.request_payer.clone())
709 .bucket(&self.bucket)
710 .prefix(&key)
711 .set_key_marker(key_marker)
712 .set_version_id_marker(version_id_marker)
713 .max_keys(max_keys);
714
715 if self.cancellation_token.is_cancelled() {
716 debug!("list_object_versions() canceled.");
717 break;
718 }
719
720 let list_object_versions_output = list_object_versions
721 .send()
722 .await
723 .context("aws_sdk_s3::client::list_object_versions() failed.")?;
724
725 object_versions.append(
726 &mut list_object_versions_output
727 .versions()
728 .iter()
729 .filter(|&object| object.key().unwrap() == key)
730 .cloned()
731 .map(|object| clone_object_version_with_key(&object, &key_without_prefix))
732 .collect(),
733 );
734
735 if !list_object_versions_output.is_truncated().unwrap() {
736 break;
737 }
738
739 key_marker = list_object_versions_output
740 .next_key_marker()
741 .map(|marker| marker.to_string());
742 version_id_marker = list_object_versions_output
743 .next_version_id_marker()
744 .map(|marker| marker.to_string());
745 }
746
747 Ok(object_versions)
748 }
749
750 async fn get_object_tagging(
751 &self,
752 key: &str,
753 version_id: Option<String>,
754 ) -> Result<GetObjectTaggingOutput> {
755 let result = self
756 .client
757 .as_ref()
758 .unwrap()
759 .get_object_tagging()
760 .set_request_payer(self.request_payer.clone())
761 .bucket(&self.bucket)
762 .key(generate_full_key(&self.prefix, key))
763 .set_version_id(version_id)
764 .send()
765 .await
766 .context("aws_sdk_s3::client::get_object_tagging() failed.")?;
767
768 Ok(result)
769 }
770
771 async fn head_object(
772 &self,
773 key: &str,
774 version_id: Option<String>,
775 checksum_mode: Option<ChecksumMode>,
776 range: Option<String>,
777 sse_c: Option<String>,
778 sse_c_key: SseCustomerKey,
779 sse_c_key_md5: Option<String>,
780 ) -> Result<HeadObjectOutput> {
781 let result = self
782 .client
783 .as_ref()
784 .unwrap()
785 .head_object()
786 .set_request_payer(self.request_payer.clone())
787 .bucket(&self.bucket)
788 .key(generate_full_key(&self.prefix, key))
789 .set_range(range)
790 .set_version_id(version_id)
791 .set_checksum_mode(checksum_mode)
792 .set_sse_customer_algorithm(sse_c)
793 .set_sse_customer_key(sse_c_key.key.clone())
794 .set_sse_customer_key_md5(sse_c_key_md5)
795 .send()
796 .await
797 .context("aws_sdk_s3::client::head_object() failed.")?;
798
799 Ok(result)
800 }
801
802 async fn head_object_first_part(
803 &self,
804 key: &str,
805 version_id: Option<String>,
806 checksum_mode: Option<ChecksumMode>,
807 sse_c: Option<String>,
808 sse_c_key: SseCustomerKey,
809 sse_c_key_md5: Option<String>,
810 ) -> Result<HeadObjectOutput> {
811 let result = self
812 .client
813 .as_ref()
814 .unwrap()
815 .head_object()
816 .set_request_payer(self.request_payer.clone())
817 .bucket(&self.bucket)
818 .key(generate_full_key(&self.prefix, key))
819 .set_version_id(version_id)
820 .part_number(1)
821 .set_checksum_mode(checksum_mode)
822 .set_sse_customer_algorithm(sse_c)
823 .set_sse_customer_key(sse_c_key.key.clone())
824 .set_sse_customer_key_md5(sse_c_key_md5)
825 .send()
826 .await
827 .context("aws_sdk_s3::client::head_object() failed.")?;
828
829 Ok(result)
830 }
831
832 async fn get_object_parts(
833 &self,
834 key: &str,
835 version_id: Option<String>,
836 sse_c: Option<String>,
837 sse_c_key: SseCustomerKey,
838 sse_c_key_md5: Option<String>,
839 ) -> Result<Vec<ObjectPart>> {
840 let object = self
841 .client
842 .as_ref()
843 .unwrap()
844 .head_object()
845 .set_request_payer(self.request_payer.clone())
846 .bucket(&self.bucket)
847 .key(generate_full_key(&self.prefix, key))
848 .set_version_id(version_id.clone())
849 .part_number(1)
850 .set_sse_customer_algorithm(sse_c.clone())
851 .set_sse_customer_key(sse_c_key.key.clone())
852 .set_sse_customer_key_md5(sse_c_key_md5.clone())
853 .send()
854 .await
855 .context("aws_sdk_s3::client::head_object() failed.")?;
856
857 let mut object_parts = vec![];
858
859 let parts_count = object.parts_count().unwrap_or_default();
860 if parts_count == 0 {
861 return Ok(vec![]);
862 }
863
864 object_parts.push(
865 ObjectPartBuilder::default()
866 .size(object.content_length().unwrap())
867 .build(),
868 );
869
870 for part_number in 2..=parts_count {
871 let object = self
872 .client
873 .as_ref()
874 .unwrap()
875 .head_object()
876 .set_request_payer(self.request_payer.clone())
877 .bucket(&self.bucket)
878 .key(generate_full_key(&self.prefix, key))
879 .set_version_id(version_id.clone())
880 .part_number(part_number)
881 .set_sse_customer_algorithm(sse_c.clone())
882 .set_sse_customer_key(sse_c_key.key.clone())
883 .set_sse_customer_key_md5(sse_c_key_md5.clone())
884 .send()
885 .await
886 .context("aws_sdk_s3::client::head_object() failed.")?;
887
888 object_parts.push(
889 ObjectPartBuilder::default()
890 .size(object.content_length().unwrap())
891 .build(),
892 );
893 }
894
895 Ok(object_parts)
896 }
897
898 async fn get_object_parts_attributes(
899 &self,
900 key: &str,
901 version_id: Option<String>,
902 max_parts: i32,
903 sse_c: Option<String>,
904 sse_c_key: SseCustomerKey,
905 sse_c_key_md5: Option<String>,
906 ) -> Result<Vec<ObjectPart>> {
907 let mut object_parts = vec![];
908 let mut part_number_marker = None;
909 loop {
910 let object = self
911 .client
912 .as_ref()
913 .unwrap()
914 .get_object_attributes()
915 .set_request_payer(self.request_payer.clone())
916 .bucket(&self.bucket)
917 .key(generate_full_key(&self.prefix, key))
918 .set_version_id(version_id.clone())
919 .object_attributes(ObjectAttributes::ObjectParts)
920 .set_part_number_marker(part_number_marker)
921 .set_sse_customer_algorithm(sse_c.clone())
922 .set_sse_customer_key(sse_c_key.key.clone())
923 .set_sse_customer_key_md5(sse_c_key_md5.clone())
924 .max_parts(max_parts)
925 .send()
926 .await
927 .context("aws_sdk_s3::client::get_object_attributes() failed.")?;
928
929 if object.object_parts().is_none() || object.object_parts().unwrap().parts().is_empty()
931 {
932 return Ok(vec![]);
933 }
934
935 for part in object.object_parts().unwrap().parts() {
936 object_parts.push(part.clone());
937 }
938
939 if !object.object_parts().unwrap().is_truncated().unwrap() {
940 break;
941 }
942
943 part_number_marker = object
944 .object_parts()
945 .unwrap()
946 .next_part_number_marker()
947 .map(|marker| marker.to_string());
948 }
949
950 Ok(object_parts)
951 }
952
953 async fn put_object(
954 &self,
955 key: &str,
956 source: Storage,
957 source_size: u64,
958 source_additional_checksum: Option<String>,
959 mut get_object_output_first_chunk: GetObjectOutput,
960 tagging: Option<String>,
961 object_checksum: Option<ObjectChecksum>,
962 if_match: Option<String>,
963 copy_source_if_match: Option<String>,
964 ) -> Result<PutObjectOutput> {
965 let mut version_id = "".to_string();
966 if let Some(source_version_id) = get_object_output_first_chunk.version_id().as_ref() {
967 version_id = source_version_id.to_string();
968 }
969 let target_key = generate_full_key(&self.prefix, key);
970 let source_key = key;
971 let source_last_modified = aws_smithy_types::DateTime::from_millis(
972 get_object_output_first_chunk
973 .last_modified()
974 .unwrap()
975 .to_millis()?,
976 )
977 .to_chrono_utc()?
978 .to_rfc3339();
979
980 if self.config.dry_run {
981 self.send_stats(SyncBytes(source_size)).await;
982
983 let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
984 event_data.key = Some(key.to_string());
985 event_data.source_version_id = get_object_output_first_chunk
987 .version_id()
988 .as_ref()
989 .map(|v| v.to_string());
990 event_data.source_last_modified =
991 get_object_output_first_chunk.last_modified().copied();
992 event_data.source_etag = get_object_output_first_chunk.e_tag().map(|e| e.to_string());
994 event_data.source_size = Some(source_size);
995 event_data.target_size = Some(source_size); self.config.event_manager.trigger_event(event_data).await;
997
998 info!(
999 key = key,
1000 source_version_id = version_id,
1001 source_last_modified = source_last_modified,
1002 target_key = target_key,
1003 size = source_size.to_string(),
1004 if_match = if_match.clone(),
1005 copy_source_if_match = copy_source_if_match.clone(),
1006 "[dry-run] sync completed.",
1007 );
1008
1009 return Ok(PutObjectOutput::builder().build());
1010 }
1011
1012 let additional_checksum_value = get_additional_checksum(
1015 &get_object_output_first_chunk,
1016 object_checksum.as_ref().unwrap().checksum_algorithm.clone(),
1017 );
1018 let full_object_checksum = is_full_object_checksum(&additional_checksum_value);
1019 #[allow(clippy::unnecessary_unwrap)]
1020 let checksum = if object_checksum.is_some()
1021 && object_checksum
1022 .as_ref()
1023 .unwrap()
1024 .checksum_algorithm
1025 .is_some()
1026 && !self.config.full_object_checksum
1027 && !full_object_checksum
1028 {
1029 Some(Arc::new(AdditionalChecksum::new(
1030 object_checksum
1031 .as_ref()
1032 .unwrap()
1033 .checksum_algorithm
1034 .as_ref()
1035 .unwrap()
1036 .clone(),
1037 self.config.full_object_checksum,
1038 )))
1039 } else {
1040 None
1041 };
1042
1043 get_object_output_first_chunk.body = convert_to_buf_byte_stream_with_callback(
1044 get_object_output_first_chunk.body.into_async_read(),
1045 self.get_stats_sender(),
1046 self.rate_limit_bandwidth.clone(),
1047 checksum,
1048 object_checksum.clone(),
1049 );
1050
1051 let mut upload_manager = UploadManager::new(
1052 self.client.clone().unwrap(),
1053 self.config.clone(),
1054 self.request_payer.clone(),
1055 self.cancellation_token.clone(),
1056 self.get_stats_sender(),
1057 tagging,
1058 object_checksum.unwrap_or_default().object_parts,
1059 self.is_express_onezone_storage(),
1060 source,
1061 source_key.to_string(),
1062 source_size,
1063 source_additional_checksum,
1064 if_match,
1065 copy_source_if_match,
1066 self.has_warning.clone(),
1067 );
1068
1069 self.exec_rate_limit_objects_per_sec().await;
1070
1071 let put_object_output = upload_manager
1072 .upload(&self.bucket, &target_key, get_object_output_first_chunk)
1073 .await?;
1074
1075 if put_object_output.e_tag.is_some() {
1077 info!(
1078 key = key,
1079 source_version_id = version_id,
1080 source_last_modified = source_last_modified,
1081 target_key = target_key,
1082 size = source_size,
1083 "sync completed.",
1084 );
1085 }
1086
1087 Ok(put_object_output)
1088 }
1089
1090 async fn put_object_tagging(
1091 &self,
1092 key: &str,
1093 version_id: Option<String>,
1094 tagging: Tagging,
1095 ) -> Result<PutObjectTaggingOutput> {
1096 let target_key = generate_full_key(&self.prefix, key);
1097 let version_id_str = version_id.clone().unwrap_or_default();
1098
1099 if self.config.dry_run {
1100 info!(
1101 key = key,
1102 target_version_id = version_id_str,
1103 target_key = target_key,
1104 "[dry-run] sync(tagging only) completed.",
1105 );
1106
1107 return Ok(PutObjectTaggingOutput::builder().build());
1108 }
1109
1110 self.exec_rate_limit_objects_per_sec().await;
1111
1112 let result = self
1113 .client
1114 .as_ref()
1115 .unwrap()
1116 .put_object_tagging()
1117 .set_request_payer(self.request_payer.clone())
1118 .bucket(&self.bucket)
1119 .key(&target_key)
1120 .set_version_id(version_id.clone())
1121 .tagging(tagging)
1122 .send()
1123 .await
1124 .context("aws_sdk_s3::client::put_object_tagging() failed.")?;
1125
1126 info!(
1127 key = key,
1128 target_version_id = version_id_str,
1129 target_key = target_key,
1130 "sync(tagging only) completed.",
1131 );
1132
1133 Ok(result)
1134 }
1135
1136 async fn delete_object(
1137 &self,
1138 key: &str,
1139 version_id: Option<String>,
1140 if_match: Option<String>,
1141 ) -> Result<DeleteObjectOutput> {
1142 let target_key = generate_full_key(&self.prefix, key);
1143 let version_id_str = version_id.clone().unwrap_or_default();
1144
1145 if self.config.dry_run {
1146 info!(
1147 key = key,
1148 target_version_id = version_id_str,
1149 target_key = target_key,
1150 if_match = if_match.clone(),
1151 "[dry-run] delete completed.",
1152 );
1153
1154 return Ok(DeleteObjectOutput::builder().build());
1155 }
1156
1157 self.exec_rate_limit_objects_per_sec().await;
1158
1159 let result = self
1160 .client
1161 .as_ref()
1162 .unwrap()
1163 .delete_object()
1164 .set_request_payer(self.request_payer.clone())
1165 .bucket(&self.bucket)
1166 .key(&target_key)
1167 .set_version_id(version_id.clone())
1168 .set_if_match(if_match.clone())
1169 .send()
1170 .await
1171 .context("aws_sdk_s3::client::delete_object() failed.")?;
1172
1173 info!(
1174 key = key,
1175 target_version_id = version_id_str,
1176 target_key = target_key,
1177 if_match = if_match.clone(),
1178 "delete completed.",
1179 );
1180
1181 Ok(result)
1182 }
1183
1184 async fn delete_object_tagging(
1185 &self,
1186 key: &str,
1187 version_id: Option<String>,
1188 ) -> Result<DeleteObjectTaggingOutput> {
1189 let target_key = generate_full_key(&self.prefix, key);
1190 let version_id_str = version_id.clone().unwrap_or_default();
1191
1192 if self.config.dry_run {
1193 info!(
1194 key = key,
1195 target_version_id = version_id_str,
1196 target_key = target_key,
1197 "[dry-run] sync(delete tagging only) completed.",
1198 );
1199
1200 return Ok(DeleteObjectTaggingOutput::builder().build());
1201 }
1202
1203 self.exec_rate_limit_objects_per_sec().await;
1204
1205 let result = self
1206 .client
1207 .as_ref()
1208 .unwrap()
1209 .delete_object_tagging()
1210 .bucket(&self.bucket)
1211 .key(&target_key)
1212 .set_version_id(version_id.clone())
1213 .send()
1214 .await
1215 .context("aws_sdk_s3::client::delete_object_tagging() failed.")?;
1216
1217 info!(
1218 key = key,
1219 target_version_id = version_id_str,
1220 target_key = target_key,
1221 "sync(delete tagging only) completed.",
1222 );
1223
1224 Ok(result)
1225 }
1226
1227 async fn is_versioning_enabled(&self) -> Result<bool> {
1228 let result = self
1229 .client
1230 .as_ref()
1231 .unwrap()
1232 .get_bucket_versioning()
1233 .bucket(&self.bucket)
1234 .send()
1235 .await
1236 .context("aws_sdk_s3::client::get_bucket_versioning() failed.")?;
1237
1238 if result.status().is_none() {
1239 return Ok(false);
1240 }
1241
1242 Ok(*result.status().unwrap() == BucketVersioningStatus::Enabled)
1243 }
1244
1245 fn get_client(&self) -> Option<Arc<Client>> {
1246 self.client.clone()
1247 }
1248
1249 fn get_stats_sender(&self) -> Sender<SyncStatistics> {
1250 self.stats_sender.clone()
1251 }
1252
1253 async fn send_stats(&self, stats: SyncStatistics) {
1254 let _ = self.stats_sender.send(stats).await;
1255 }
1256
1257 #[cfg_attr(coverage_nightly, coverage(off))]
1258 fn get_local_path(&self) -> PathBuf {
1259 unimplemented!();
1261 }
1262
1263 fn get_rate_limit_bandwidth(&self) -> Option<Arc<RateLimiter>> {
1264 self.rate_limit_bandwidth.clone()
1265 }
1266
1267 fn generate_copy_source_key(&self, key: &str, version_id: Option<String>) -> String {
1268 let full_key = generate_full_key(&self.prefix, key);
1269 let full_key = urlencoding::encode(&full_key);
1270
1271 if version_id.is_some() {
1272 return format!(
1273 "{}/{}?versionId={}",
1274 &self.bucket,
1275 full_key,
1276 version_id.unwrap()
1277 );
1278 }
1279 format!("{}/{}", &self.bucket, full_key)
1280 }
1281
1282 fn set_warning(&self) {
1283 self.has_warning
1284 .store(true, std::sync::atomic::Ordering::SeqCst);
1285 }
1286}
1287
1288pub fn remove_s3_prefix(key: &str, prefix: &str) -> String {
1289 key.to_string().replacen(prefix, "", 1)
1290}
1291
1292pub fn generate_full_key(prefix: &str, key: &str) -> String {
1293 format!("{prefix}{key}")
1294}
1295
1296fn is_express_onezone_storage(bucket: &str) -> bool {
1297 bucket.ends_with(EXPRESS_ONEZONE_STORAGE_SUFFIX)
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302 use super::*;
1303 use crate::config::args::parse_from_args;
1304 use crate::types::token::create_pipeline_cancellation_token;
1305 use tracing_subscriber::EnvFilter;
1306
1307 #[test]
1308 fn remove_s3_prefix_test() {
1309 init_dummy_tracing_subscriber();
1310
1311 assert_eq!(remove_s3_prefix("dir1/data1", "dir1/data1"), "");
1312
1313 assert_eq!(remove_s3_prefix("dir1/data1", "dir1"), "/data1");
1314 assert_eq!(remove_s3_prefix("dir1/data1", "dir1/"), "data1");
1315 assert_eq!(remove_s3_prefix("/dir1/data1", "/dir1"), "/data1");
1316 assert_eq!(remove_s3_prefix("/dir1/data1", "/dir1/"), "data1");
1317 }
1318
1319 #[test]
1320 fn is_express_onezone_storage_test() {
1321 init_dummy_tracing_subscriber();
1322
1323 assert!(is_express_onezone_storage("bucket--x-s3"));
1324
1325 assert!(!is_express_onezone_storage("bucket-x-s3"));
1326 assert!(!is_express_onezone_storage("bucket--x-s3s"));
1327 assert!(!is_express_onezone_storage("bucket"));
1328 }
1329
1330 #[tokio::test]
1331 async fn create_storage() {
1332 init_dummy_tracing_subscriber();
1333
1334 let args = vec![
1335 "s3sync",
1336 "--source-access-key",
1337 "source_access_key",
1338 "--source-secret-access-key",
1339 "source_secret_access_key",
1340 "--target-access-key",
1341 "target_access_key",
1342 "--target-secret-access-key",
1343 "target_secret_access_key",
1344 "s3://source-bucket",
1345 "s3://target-bucket",
1346 ];
1347 let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1348 let (stats_sender, _) = async_channel::unbounded();
1349
1350 let storage = S3StorageFactory::create(
1351 config.clone(),
1352 config.source.clone(),
1353 create_pipeline_cancellation_token(),
1354 stats_sender,
1355 config.source_client_config.clone(),
1356 None,
1357 None,
1358 None,
1359 Arc::new(AtomicBool::new(false)),
1360 None,
1361 )
1362 .await;
1363
1364 assert!(storage.get_client().is_some());
1365 }
1366
1367 #[tokio::test]
1368 async fn get_object_error() {
1369 init_dummy_tracing_subscriber();
1370
1371 let args = vec![
1372 "s3sync",
1373 "--dry-run",
1374 "--target-access-key",
1375 "dummy_access_key",
1376 "--target-secret-access-key",
1377 "dummy_secret_access_key",
1378 "--aws-max-attempts",
1379 "1",
1380 "--target-endpoint-url",
1381 "https://invalid-s3-endpoint-url.6329313.local:65535",
1382 "--force-retry-count",
1383 "1",
1384 "--force-retry-interval-milliseconds",
1385 "1",
1386 "./test_data/",
1387 "s3://dummy-bucket",
1388 ];
1389 let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1390 let (stats_sender, _) = async_channel::unbounded();
1391
1392 let storage = S3StorageFactory::create(
1393 config.clone(),
1394 config.target.clone(),
1395 create_pipeline_cancellation_token(),
1396 stats_sender,
1397 config.target_client_config.clone(),
1398 None,
1399 None,
1400 None,
1401 Arc::new(AtomicBool::new(false)),
1402 None,
1403 )
1404 .await;
1405
1406 assert!(
1407 storage
1408 .get_object(
1409 "source/data1",
1410 None,
1411 None,
1412 None,
1413 None,
1414 SseCustomerKey { key: None },
1415 None,
1416 )
1417 .await
1418 .is_err()
1419 );
1420 }
1421
1422 #[tokio::test]
1423 async fn is_local_storage() {
1424 init_dummy_tracing_subscriber();
1425
1426 let args = vec![
1427 "s3sync",
1428 "--source-access-key",
1429 "source_access_key",
1430 "--source-secret-access-key",
1431 "source_secret_access_key",
1432 "--target-access-key",
1433 "target_access_key",
1434 "--target-secret-access-key",
1435 "target_secret_access_key",
1436 "s3://source-bucket",
1437 "s3://target-bucket",
1438 ];
1439 let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1440 let (stats_sender, _) = async_channel::unbounded();
1441
1442 let storage = S3StorageFactory::create(
1443 config.clone(),
1444 config.source.clone(),
1445 create_pipeline_cancellation_token(),
1446 stats_sender,
1447 config.source_client_config.clone(),
1448 None,
1449 None,
1450 None,
1451 Arc::new(AtomicBool::new(false)),
1452 None,
1453 )
1454 .await;
1455
1456 assert!(storage.get_client().is_some());
1457 assert!(!storage.is_local_storage());
1458 }
1459
1460 #[tokio::test]
1461 #[should_panic]
1462 async fn create_storage_panic() {
1463 init_dummy_tracing_subscriber();
1464
1465 let args = vec![
1466 "s3sync",
1467 "--source-access-key",
1468 "source_access_key",
1469 "--source-secret-access-key",
1470 "source_secret_access_key",
1471 "--target-access-key",
1472 "target_access_key",
1473 "--target-secret-access-key",
1474 "target_secret_access_key",
1475 "/source-dir",
1476 "s3://target-bucket",
1477 ];
1478 let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1479 let (stats_sender, _) = async_channel::unbounded();
1480
1481 S3StorageFactory::create(
1482 config.clone(),
1483 config.source.clone(),
1484 create_pipeline_cancellation_token(),
1485 stats_sender,
1486 config.source_client_config.clone(),
1487 None,
1488 None,
1489 None,
1490 Arc::new(AtomicBool::new(false)),
1491 None,
1492 )
1493 .await;
1494 }
1495
1496 #[tokio::test]
1497 async fn stats_channel_test() {
1498 init_dummy_tracing_subscriber();
1499
1500 let args = vec![
1501 "s3sync",
1502 "--source-access-key",
1503 "source_access_key",
1504 "--source-secret-access-key",
1505 "source_secret_access_key",
1506 "--target-access-key",
1507 "target_access_key",
1508 "--target-secret-access-key",
1509 "target_secret_access_key",
1510 "s3://source-bucket",
1511 "s3://target-bucket",
1512 ];
1513 let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1514 let (stats_sender, stats_receiver) = async_channel::unbounded();
1515
1516 let storage = S3StorageFactory::create(
1517 config.clone(),
1518 config.source.clone(),
1519 create_pipeline_cancellation_token(),
1520 stats_sender,
1521 config.source_client_config.clone(),
1522 None,
1523 None,
1524 None,
1525 Arc::new(AtomicBool::new(false)),
1526 None,
1527 )
1528 .await;
1529
1530 let stats_sender = storage.get_stats_sender();
1531
1532 stats_sender.send(SyncBytes(0)).await.unwrap();
1533 assert_eq!(stats_receiver.recv().await.unwrap(), SyncBytes(0));
1534 }
1535
1536 #[tokio::test]
1537 async fn generate_full_key_test() {
1538 init_dummy_tracing_subscriber();
1539
1540 assert_eq!(generate_full_key("dir1/", "data1"), "dir1/data1");
1541 assert_eq!(generate_full_key("dir1", "data1"), "dir1data1");
1542
1543 assert_eq!(generate_full_key("data1", "data1"), "data1data1");
1544 }
1545
1546 fn init_dummy_tracing_subscriber() {
1547 let _ = tracing_subscriber::fmt()
1548 .with_env_filter(
1549 EnvFilter::try_from_default_env()
1550 .or_else(|_| EnvFilter::try_new("dummy=trace"))
1551 .unwrap(),
1552 )
1553 .try_init();
1554 }
1555}