s3sync/storage/s3/
mod.rs

1use anyhow::{Context, Result, anyhow};
2use async_channel::Sender;
3use async_trait::async_trait;
4use aws_sdk_s3::Client;
5use aws_sdk_s3::operation::delete_object::DeleteObjectOutput;
6use aws_sdk_s3::operation::delete_object_tagging::DeleteObjectTaggingOutput;
7use aws_sdk_s3::operation::get_object::GetObjectOutput;
8use aws_sdk_s3::operation::get_object_tagging::GetObjectTaggingOutput;
9use aws_sdk_s3::operation::head_object::HeadObjectOutput;
10use aws_sdk_s3::operation::put_object::PutObjectOutput;
11use aws_sdk_s3::operation::put_object_tagging::PutObjectTaggingOutput;
12use aws_sdk_s3::types::builders::ObjectPartBuilder;
13use aws_sdk_s3::types::{
14    BucketVersioningStatus, ChecksumMode, DeleteMarkerEntry, ObjectAttributes, ObjectPart,
15    ObjectVersion, RequestPayer, Tagging,
16};
17use aws_smithy_types_convert::date_time::DateTimeExt;
18use leaky_bucket::RateLimiter;
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::future::Future;
22use std::path::PathBuf;
23use std::pin::Pin;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use tokio::sync::Semaphore;
27use tokio::task::JoinSet;
28use tracing::{debug, error, info};
29
30use crate::Config;
31use crate::config::ClientConfig;
32use crate::storage::checksum::AdditionalChecksum;
33use crate::storage::s3::upload_manager::UploadManager;
34use crate::storage::{
35    Storage, StorageFactory, StorageTrait, convert_head_to_get_object_output,
36    convert_to_buf_byte_stream_with_callback,
37};
38use crate::types::SyncStatistics::{SyncBytes, SyncSkip};
39use crate::types::event_callback::{EventData, EventType};
40use crate::types::token::PipelineCancellationToken;
41use crate::types::{
42    ObjectChecksum, ObjectVersions, S3syncObject, SseCustomerKey, StoragePath, SyncStatistics,
43    clone_object_version_with_key, get_additional_checksum, is_full_object_checksum,
44};
45
46const EXPRESS_ONEZONE_STORAGE_SUFFIX: &str = "--x-s3";
47
48mod client_builder;
49mod upload_manager;
50
51pub struct S3StorageFactory {}
52
53#[async_trait]
54impl StorageFactory for S3StorageFactory {
55    async fn create(
56        config: Config,
57        path: StoragePath,
58        cancellation_token: PipelineCancellationToken,
59        stats_sender: Sender<SyncStatistics>,
60        client_config: Option<ClientConfig>,
61        request_payer: Option<RequestPayer>,
62        rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
63        rate_limit_bandwidth: Option<Arc<RateLimiter>>,
64        has_warning: Arc<AtomicBool>,
65        _object_to_list: Option<String>,
66    ) -> Storage {
67        S3Storage::boxed_new(
68            config,
69            path,
70            cancellation_token,
71            stats_sender,
72            Some(Arc::new(
73                client_config.as_ref().unwrap().create_client().await,
74            )),
75            request_payer,
76            rate_limit_objects_per_sec,
77            rate_limit_bandwidth,
78            has_warning,
79        )
80        .await
81    }
82}
83
84#[derive(Clone)]
85struct S3Storage {
86    config: Config,
87    bucket: String,
88    prefix: String,
89    cancellation_token: PipelineCancellationToken,
90    client: Option<Arc<Client>>,
91    request_payer: Option<RequestPayer>,
92    stats_sender: Sender<SyncStatistics>,
93    rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
94    rate_limit_bandwidth: Option<Arc<RateLimiter>>,
95    has_warning: Arc<AtomicBool>,
96    listing_worker_semaphore: Arc<Semaphore>,
97}
98
99impl S3Storage {
100    #[allow(clippy::too_many_arguments)]
101    async fn boxed_new(
102        config: Config,
103        path: StoragePath,
104        cancellation_token: PipelineCancellationToken,
105        stats_sender: Sender<SyncStatistics>,
106        client: Option<Arc<Client>>,
107        request_payer: Option<RequestPayer>,
108        rate_limit_objects_per_sec: Option<Arc<RateLimiter>>,
109        rate_limit_bandwidth: Option<Arc<RateLimiter>>,
110        has_warning: Arc<AtomicBool>,
111    ) -> Storage {
112        let (bucket, prefix) = if let StoragePath::S3 { bucket, prefix } = path {
113            (bucket, prefix)
114        } else {
115            panic!("s3 path not found")
116        };
117
118        let max_parallel_listings: usize = config.max_parallel_listings as usize;
119        let storage = S3Storage {
120            config,
121            bucket,
122            prefix,
123            cancellation_token,
124            client,
125            request_payer,
126            stats_sender,
127            rate_limit_objects_per_sec,
128            rate_limit_bandwidth,
129            has_warning,
130            listing_worker_semaphore: Arc::new(Semaphore::new(max_parallel_listings)),
131        };
132
133        Box::new(storage)
134    }
135
136    async fn aggregate_delete_markers(
137        &self,
138        delete_marker_entries: &[DeleteMarkerEntry],
139        s3sync_object_map: &mut HashMap<String, Vec<S3syncObject>>,
140    ) {
141        for delete_marker in delete_marker_entries {
142            // If point-in-time is set, Intermediate delete markers will be needed.
143            if self.config.point_in_time.is_none() && !delete_marker.is_latest().unwrap() {
144                continue;
145            }
146
147            let key_without_prefix = remove_s3_prefix(delete_marker.key().unwrap(), &self.prefix);
148            if key_without_prefix.is_empty() {
149                let mut event_data = EventData::new(EventType::SYNC_FILTERED);
150                event_data.key = Some(delete_marker.key().unwrap().to_string());
151                // skipcq: RS-W1070
152                event_data.source_version_id = delete_marker.version_id.clone();
153                event_data.message = Some("Key that is same as prefix is skipped.".to_string());
154                self.config.event_manager.trigger_event(event_data).await;
155
156                self.send_stats(SyncSkip {
157                    key: delete_marker.key().unwrap().to_string(),
158                })
159                .await;
160
161                let key = delete_marker.key().unwrap();
162                debug!(key = key, "Key that is same as prefix is skipped.");
163
164                continue;
165            }
166
167            let delete_marker_object =
168                S3syncObject::clone_delete_marker_with_key(delete_marker, &key_without_prefix);
169
170            if s3sync_object_map.get_mut(&key_without_prefix).is_none() {
171                s3sync_object_map.insert(key_without_prefix.to_string(), ObjectVersions::new());
172            }
173            s3sync_object_map
174                .get_mut(&key_without_prefix)
175                .unwrap()
176                .push(delete_marker_object);
177        }
178    }
179
180    async fn aggregate_object_versions_and_send(
181        &self,
182        sender: &Sender<S3syncObject>,
183        object_versions: &[ObjectVersion],
184        s3sync_object_map: &mut HashMap<String, ObjectVersions>,
185    ) -> Result<()> {
186        let mut previous_key = "".to_string();
187        for object in object_versions {
188            let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
189            if key_without_prefix.is_empty() {
190                let mut event_data = EventData::new(EventType::SYNC_FILTERED);
191                event_data.key = object.key().map(|k| k.to_string());
192                // skipcq: RS-W1070
193                event_data.source_version_id = object.version_id.clone();
194                event_data.message = Some("Key that is same as prefix is skipped.".to_string());
195                self.config.event_manager.trigger_event(event_data).await;
196
197                self.send_stats(SyncSkip {
198                    key: object.key().unwrap().to_string(),
199                })
200                .await;
201
202                let key = object.key().unwrap();
203                debug!(key = key, "Key that is same as prefix is skipped.");
204
205                continue;
206            }
207
208            if !previous_key.is_empty() && previous_key != key_without_prefix {
209                Self::send_object_versions_with_sort(
210                    sender,
211                    &mut s3sync_object_map.remove(&previous_key).unwrap(),
212                )
213                .await?;
214            }
215
216            let versioning_object =
217                S3syncObject::clone_versioning_object_with_key(object, &key_without_prefix);
218
219            if s3sync_object_map.get(&key_without_prefix).is_none() {
220                s3sync_object_map.insert(key_without_prefix.to_string(), ObjectVersions::new());
221            }
222            s3sync_object_map
223                .get_mut(&key_without_prefix)
224                .unwrap()
225                .push(versioning_object);
226
227            previous_key = key_without_prefix;
228        }
229
230        Ok(())
231    }
232
233    async fn send_object_versions_with_sort(
234        sender: &Sender<S3syncObject>,
235        object_versions: &mut ObjectVersions,
236    ) -> Result<()> {
237        object_versions.sort_by(|a, b| {
238            if a.is_latest() {
239                return Ordering::Greater;
240            }
241            if b.is_latest() {
242                return Ordering::Less;
243            }
244
245            a.last_modified()
246                .as_nanos()
247                .cmp(&b.last_modified().as_nanos())
248        });
249
250        for object in object_versions {
251            debug!(
252                key = object.key(),
253                "list_object_versions(): sending remote object."
254            );
255            if let Err(e) = sender
256                .send(object.clone())
257                .await
258                .context("async_channel::Sender::send() failed.")
259            {
260                return if !sender.is_closed() { Err(e) } else { Ok(()) };
261            }
262        }
263
264        Ok(())
265    }
266
267    async fn exec_rate_limit_objects_per_sec(&self) {
268        if self.rate_limit_objects_per_sec.is_some() {
269            self.rate_limit_objects_per_sec
270                .as_ref()
271                .unwrap()
272                .acquire(1)
273                .await;
274        }
275    }
276
277    fn list_objects_with_parallel<'a>(
278        &'a self,
279        prefix: &'a str,
280        sender: &'a Sender<S3syncObject>,
281        max_keys: i32,
282        current_depth: usize,
283        permit: tokio::sync::OwnedSemaphorePermit,
284    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
285        Box::pin(async move {
286            let is_root_prefix = prefix.is_empty();
287            let prefix = if is_root_prefix {
288                self.prefix.clone()
289            } else {
290                prefix.to_string()
291            };
292
293            // Until max_parallel_listing_max_depth, we need to set a delimiter to "/" for parallel listing of sub-prefixes.
294            let delimiter = if current_depth <= self.config.max_parallel_listing_max_depth as usize
295            {
296                Some("/".to_string())
297            } else {
298                None
299            };
300
301            let mut current_permit = Some(permit);
302
303            debug!(
304                root_prefix = self.prefix,
305                prefix = prefix.as_str(),
306                "Start listing objects."
307            );
308
309            let mut continuation_token = None;
310            loop {
311                // Listing objects is rate-limited.
312                self.exec_rate_limit_objects_per_sec().await;
313
314                let list_object_v2 = self
315                    .client
316                    .as_ref()
317                    .unwrap()
318                    .list_objects_v2()
319                    .set_request_payer(self.request_payer.clone())
320                    .bucket(&self.bucket)
321                    .prefix(&prefix)
322                    .set_delimiter(delimiter.clone())
323                    .set_continuation_token(continuation_token)
324                    .max_keys(max_keys);
325
326                if self.cancellation_token.is_cancelled() {
327                    debug!("list_objects() canceled.");
328                    break;
329                }
330
331                let list_objects_output = list_object_v2.send().await?;
332
333                for object in list_objects_output.contents() {
334                    let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
335                    if key_without_prefix.is_empty() {
336                        let mut event_data = EventData::new(EventType::SYNC_FILTERED);
337                        event_data.key = object.key().map(|k| k.to_string());
338                        // skipcq: RS-W1070
339                        event_data.message =
340                            Some("Key that is same as prefix is skipped.".to_string());
341                        self.config.event_manager.trigger_event(event_data).await;
342
343                        self.send_stats(SyncSkip {
344                            key: object.key().unwrap().to_string(),
345                        })
346                        .await;
347
348                        let key = object.key().unwrap();
349                        debug!(key = key, "Key that is same as prefix is skipped.");
350
351                        continue;
352                    }
353
354                    let non_versioning_object = S3syncObject::clone_non_versioning_object_with_key(
355                        object,
356                        &key_without_prefix,
357                    );
358
359                    debug!(key = object.key(), "list_objects(): sending remote object.");
360                    if let Err(e) = sender
361                        .send(non_versioning_object.clone())
362                        .await
363                        .context("async_channel::Sender::send() failed.")
364                    {
365                        error!("Failed to send object: {}", e);
366                        return if !sender.is_closed() { Err(e) } else { Ok(()) };
367                    }
368                }
369
370                if let Some(common_prefixes) = list_objects_output.common_prefixes.clone() {
371                    let mut join_set = JoinSet::new();
372                    for common_prefix in common_prefixes {
373                        if self.cancellation_token.is_cancelled() {
374                            debug!("list_objects() canceled.");
375                            break;
376                        }
377
378                        if let Some(sub_prefix) = common_prefix.prefix() {
379                            let storage = self.clone();
380                            let sub_prefix = sub_prefix.to_string();
381                            let sender = sender.clone();
382
383                            debug!(
384                                root_prefix = self.prefix,
385                                prefix = prefix.as_str(),
386                                sub_prefix = sub_prefix.as_str(),
387                                "Start listing objects in sub-prefix."
388                            );
389
390                            if let Some(permit) = current_permit {
391                                drop(permit);
392                                current_permit = None;
393                            }
394
395                            let new_permit = self
396                                .listing_worker_semaphore
397                                .clone()
398                                .acquire_owned()
399                                .await
400                                .unwrap();
401
402                            join_set.spawn(async move {
403                                storage
404                                    .list_objects_with_parallel(
405                                        &sub_prefix,
406                                        &sender,
407                                        max_keys,
408                                        current_depth + 1,
409                                        new_permit,
410                                    )
411                                    .await
412                                    .context("Failed to list objects in sub-prefix.")
413                            });
414                        }
415                    }
416
417                    while let Some(join_result) = join_set.join_next().await {
418                        if let Err(join_error) = join_result {
419                            error!("Failed to join in sub-prefix: {}", join_error);
420                            self.cancellation_token.cancel();
421                            return Err(anyhow!(join_error));
422                        }
423
424                        if let Err(task_error) = join_result.unwrap() {
425                            error!("Failed to list objects in sub-prefix: {}", task_error);
426                            self.cancellation_token.cancel();
427                            return Err(anyhow!(task_error));
428                        }
429                    }
430                }
431
432                if !list_objects_output.is_truncated().unwrap() {
433                    break;
434                }
435
436                continuation_token = list_objects_output
437                    .next_continuation_token()
438                    .map(|token| token.to_string());
439            }
440
441            if let Some(permit) = current_permit {
442                drop(permit);
443            }
444
445            Ok(())
446        })
447    }
448}
449
450#[async_trait]
451impl StorageTrait for S3Storage {
452    fn is_local_storage(&self) -> bool {
453        false
454    }
455
456    fn is_express_onezone_storage(&self) -> bool {
457        is_express_onezone_storage(&self.bucket)
458    }
459
460    async fn list_objects(
461        &self,
462        sender: &Sender<S3syncObject>,
463        max_keys: i32,
464        _warn_as_error: bool,
465    ) -> Result<()> {
466        if self.config.max_parallel_listings > 1 {
467            if !self.is_express_onezone_storage() {
468                debug!(
469                    "Using parallel listing with {} workers.",
470                    self.config.max_parallel_listings
471                );
472                let permit = self
473                    .listing_worker_semaphore
474                    .clone()
475                    .acquire_owned()
476                    .await
477                    .unwrap();
478
479                return self
480                    .list_objects_with_parallel("", sender, max_keys, 1, permit)
481                    .await
482                    .context("Failed to parallel object listing.");
483            } else if self.config.allow_parallel_listings_in_express_one_zone {
484                debug!(
485                    "Using parallel listing with {} workers(express one zone).",
486                    self.config.max_parallel_listings
487                );
488
489                let permit = self
490                    .listing_worker_semaphore
491                    .clone()
492                    .acquire_owned()
493                    .await
494                    .unwrap();
495
496                return self
497                    .list_objects_with_parallel("", sender, max_keys, 1, permit)
498                    .await
499                    .context("Failed to parallel object listing.");
500            }
501        }
502
503        debug!("Disabled parallel listing.");
504
505        let mut continuation_token = None;
506        loop {
507            let list_object_v2 = self
508                .client
509                .as_ref()
510                .unwrap()
511                .list_objects_v2()
512                .set_request_payer(self.request_payer.clone())
513                .bucket(&self.bucket)
514                .prefix(&self.prefix)
515                .set_continuation_token(continuation_token)
516                .max_keys(max_keys);
517
518            if self.cancellation_token.is_cancelled() {
519                debug!("list_objects() canceled.");
520                break;
521            }
522
523            let list_objects_output = list_object_v2
524                .send()
525                .await
526                .context("aws_sdk_s3::client::list_objects_v2() failed.")?;
527
528            for object in list_objects_output.contents() {
529                let key_without_prefix = remove_s3_prefix(object.key().unwrap(), &self.prefix);
530                if key_without_prefix.is_empty() {
531                    let mut event_data = EventData::new(EventType::SYNC_FILTERED);
532                    event_data.key = object.key().map(|k| k.to_string());
533                    // skipcq: RS-W1070
534                    event_data.message = Some("Key that is same as prefix is skipped.".to_string());
535                    self.config.event_manager.trigger_event(event_data).await;
536
537                    self.send_stats(SyncSkip {
538                        key: object.key().unwrap().to_string(),
539                    })
540                    .await;
541
542                    let key = object.key().unwrap();
543                    debug!(key = key, "Key that is same as prefix is skipped.");
544
545                    continue;
546                }
547
548                let non_versioning_object =
549                    S3syncObject::clone_non_versioning_object_with_key(object, &key_without_prefix);
550
551                debug!(key = object.key(), "list_objects(): sending remote object.");
552                if let Err(e) = sender
553                    .send(non_versioning_object.clone())
554                    .await
555                    .context("async_channel::Sender::send() failed.")
556                {
557                    return if !sender.is_closed() { Err(e) } else { Ok(()) };
558                }
559            }
560
561            if !list_objects_output.is_truncated().unwrap() {
562                break;
563            }
564
565            continuation_token = list_objects_output
566                .next_continuation_token()
567                .map(|token| token.to_string());
568        }
569
570        Ok(())
571    }
572
573    async fn list_object_versions(
574        &self,
575        sender: &Sender<S3syncObject>,
576        max_keys: i32,
577        _warn_as_error: bool,
578    ) -> Result<()> {
579        let mut key_marker = None;
580        let mut version_id_marker = None;
581
582        let mut s3sync_versioning_map = HashMap::new();
583
584        loop {
585            let list_object_versions = self
586                .client
587                .as_ref()
588                .unwrap()
589                .list_object_versions()
590                .set_request_payer(self.request_payer.clone())
591                .bucket(&self.bucket)
592                .prefix(&self.prefix)
593                .set_key_marker(key_marker)
594                .set_version_id_marker(version_id_marker)
595                .max_keys(max_keys);
596
597            if self.cancellation_token.is_cancelled() {
598                debug!("list_object_versions() canceled.");
599                break;
600            }
601
602            let list_object_versions_output = list_object_versions
603                .send()
604                .await
605                .context("aws_sdk_s3::client::list_object_versions() failed.")?;
606
607            self.aggregate_delete_markers(
608                list_object_versions_output.delete_markers(),
609                &mut s3sync_versioning_map,
610            )
611            .await;
612
613            self.aggregate_object_versions_and_send(
614                sender,
615                list_object_versions_output.versions(),
616                &mut s3sync_versioning_map,
617            )
618            .await?;
619
620            if !list_object_versions_output.is_truncated().unwrap() {
621                break;
622            }
623
624            key_marker = list_object_versions_output
625                .next_key_marker()
626                .map(|marker| marker.to_string());
627            version_id_marker = list_object_versions_output
628                .next_version_id_marker()
629                .map(|marker| marker.to_string());
630        }
631
632        // send remaining versioning objects
633        for versioning_objects in s3sync_versioning_map.values_mut() {
634            Self::send_object_versions_with_sort(sender, versioning_objects).await?;
635        }
636
637        Ok(())
638    }
639
640    async fn get_object(
641        &self,
642        key: &str,
643        version_id: Option<String>,
644        checksum_mode: Option<ChecksumMode>,
645        range: Option<String>,
646        sse_c: Option<String>,
647        sse_c_key: SseCustomerKey,
648        sse_c_key_md5: Option<String>,
649    ) -> Result<GetObjectOutput> {
650        if self.config.dry_run {
651            let head_object_result = self
652                .client
653                .as_ref()
654                .unwrap()
655                .head_object()
656                .set_request_payer(self.request_payer.clone())
657                .bucket(&self.bucket)
658                .key(generate_full_key(&self.prefix, key))
659                .set_version_id(version_id)
660                .set_checksum_mode(checksum_mode)
661                .set_range(range)
662                .set_sse_customer_algorithm(sse_c)
663                .set_sse_customer_key(sse_c_key.key.clone())
664                .set_sse_customer_key_md5(sse_c_key_md5)
665                .send()
666                .await
667                .context("aws_sdk_s3::client::head_object() failed.")?;
668
669            return Ok(convert_head_to_get_object_output(head_object_result));
670        }
671
672        let result = self
673            .client
674            .as_ref()
675            .unwrap()
676            .get_object()
677            .set_request_payer(self.request_payer.clone())
678            .bucket(&self.bucket)
679            .key(generate_full_key(&self.prefix, key))
680            .set_version_id(version_id)
681            .set_checksum_mode(checksum_mode)
682            .set_range(range)
683            .set_sse_customer_algorithm(sse_c)
684            .set_sse_customer_key(sse_c_key.key.clone())
685            .set_sse_customer_key_md5(sse_c_key_md5)
686            .send()
687            .await
688            .context("aws_sdk_s3::client::get_object() failed.")?;
689
690        Ok(result)
691    }
692
693    async fn get_object_versions(&self, key: &str, max_keys: i32) -> Result<Vec<ObjectVersion>> {
694        let mut key_marker = None;
695        let mut version_id_marker = None;
696
697        let mut object_versions = Vec::new();
698
699        let key = generate_full_key(&self.prefix, key);
700        let key_without_prefix = remove_s3_prefix(&key, &self.prefix);
701
702        loop {
703            let list_object_versions = self
704                .client
705                .as_ref()
706                .unwrap()
707                .list_object_versions()
708                .set_request_payer(self.request_payer.clone())
709                .bucket(&self.bucket)
710                .prefix(&key)
711                .set_key_marker(key_marker)
712                .set_version_id_marker(version_id_marker)
713                .max_keys(max_keys);
714
715            if self.cancellation_token.is_cancelled() {
716                debug!("list_object_versions() canceled.");
717                break;
718            }
719
720            let list_object_versions_output = list_object_versions
721                .send()
722                .await
723                .context("aws_sdk_s3::client::list_object_versions() failed.")?;
724
725            object_versions.append(
726                &mut list_object_versions_output
727                    .versions()
728                    .iter()
729                    .filter(|&object| object.key().unwrap() == key)
730                    .cloned()
731                    .map(|object| clone_object_version_with_key(&object, &key_without_prefix))
732                    .collect(),
733            );
734
735            if !list_object_versions_output.is_truncated().unwrap() {
736                break;
737            }
738
739            key_marker = list_object_versions_output
740                .next_key_marker()
741                .map(|marker| marker.to_string());
742            version_id_marker = list_object_versions_output
743                .next_version_id_marker()
744                .map(|marker| marker.to_string());
745        }
746
747        Ok(object_versions)
748    }
749
750    async fn get_object_tagging(
751        &self,
752        key: &str,
753        version_id: Option<String>,
754    ) -> Result<GetObjectTaggingOutput> {
755        let result = self
756            .client
757            .as_ref()
758            .unwrap()
759            .get_object_tagging()
760            .set_request_payer(self.request_payer.clone())
761            .bucket(&self.bucket)
762            .key(generate_full_key(&self.prefix, key))
763            .set_version_id(version_id)
764            .send()
765            .await
766            .context("aws_sdk_s3::client::get_object_tagging() failed.")?;
767
768        Ok(result)
769    }
770
771    async fn head_object(
772        &self,
773        key: &str,
774        version_id: Option<String>,
775        checksum_mode: Option<ChecksumMode>,
776        range: Option<String>,
777        sse_c: Option<String>,
778        sse_c_key: SseCustomerKey,
779        sse_c_key_md5: Option<String>,
780    ) -> Result<HeadObjectOutput> {
781        let result = self
782            .client
783            .as_ref()
784            .unwrap()
785            .head_object()
786            .set_request_payer(self.request_payer.clone())
787            .bucket(&self.bucket)
788            .key(generate_full_key(&self.prefix, key))
789            .set_range(range)
790            .set_version_id(version_id)
791            .set_checksum_mode(checksum_mode)
792            .set_sse_customer_algorithm(sse_c)
793            .set_sse_customer_key(sse_c_key.key.clone())
794            .set_sse_customer_key_md5(sse_c_key_md5)
795            .send()
796            .await
797            .context("aws_sdk_s3::client::head_object() failed.")?;
798
799        Ok(result)
800    }
801
802    async fn head_object_first_part(
803        &self,
804        key: &str,
805        version_id: Option<String>,
806        checksum_mode: Option<ChecksumMode>,
807        sse_c: Option<String>,
808        sse_c_key: SseCustomerKey,
809        sse_c_key_md5: Option<String>,
810    ) -> Result<HeadObjectOutput> {
811        let result = self
812            .client
813            .as_ref()
814            .unwrap()
815            .head_object()
816            .set_request_payer(self.request_payer.clone())
817            .bucket(&self.bucket)
818            .key(generate_full_key(&self.prefix, key))
819            .set_version_id(version_id)
820            .part_number(1)
821            .set_checksum_mode(checksum_mode)
822            .set_sse_customer_algorithm(sse_c)
823            .set_sse_customer_key(sse_c_key.key.clone())
824            .set_sse_customer_key_md5(sse_c_key_md5)
825            .send()
826            .await
827            .context("aws_sdk_s3::client::head_object() failed.")?;
828
829        Ok(result)
830    }
831
832    async fn get_object_parts(
833        &self,
834        key: &str,
835        version_id: Option<String>,
836        sse_c: Option<String>,
837        sse_c_key: SseCustomerKey,
838        sse_c_key_md5: Option<String>,
839    ) -> Result<Vec<ObjectPart>> {
840        let object = self
841            .client
842            .as_ref()
843            .unwrap()
844            .head_object()
845            .set_request_payer(self.request_payer.clone())
846            .bucket(&self.bucket)
847            .key(generate_full_key(&self.prefix, key))
848            .set_version_id(version_id.clone())
849            .part_number(1)
850            .set_sse_customer_algorithm(sse_c.clone())
851            .set_sse_customer_key(sse_c_key.key.clone())
852            .set_sse_customer_key_md5(sse_c_key_md5.clone())
853            .send()
854            .await
855            .context("aws_sdk_s3::client::head_object() failed.")?;
856
857        let mut object_parts = vec![];
858
859        let parts_count = object.parts_count().unwrap_or_default();
860        if parts_count == 0 {
861            return Ok(vec![]);
862        }
863
864        object_parts.push(
865            ObjectPartBuilder::default()
866                .size(object.content_length().unwrap())
867                .build(),
868        );
869
870        for part_number in 2..=parts_count {
871            let object = self
872                .client
873                .as_ref()
874                .unwrap()
875                .head_object()
876                .set_request_payer(self.request_payer.clone())
877                .bucket(&self.bucket)
878                .key(generate_full_key(&self.prefix, key))
879                .set_version_id(version_id.clone())
880                .part_number(part_number)
881                .set_sse_customer_algorithm(sse_c.clone())
882                .set_sse_customer_key(sse_c_key.key.clone())
883                .set_sse_customer_key_md5(sse_c_key_md5.clone())
884                .send()
885                .await
886                .context("aws_sdk_s3::client::head_object() failed.")?;
887
888            object_parts.push(
889                ObjectPartBuilder::default()
890                    .size(object.content_length().unwrap())
891                    .build(),
892            );
893        }
894
895        Ok(object_parts)
896    }
897
898    async fn get_object_parts_attributes(
899        &self,
900        key: &str,
901        version_id: Option<String>,
902        max_parts: i32,
903        sse_c: Option<String>,
904        sse_c_key: SseCustomerKey,
905        sse_c_key_md5: Option<String>,
906    ) -> Result<Vec<ObjectPart>> {
907        let mut object_parts = vec![];
908        let mut part_number_marker = None;
909        loop {
910            let object = self
911                .client
912                .as_ref()
913                .unwrap()
914                .get_object_attributes()
915                .set_request_payer(self.request_payer.clone())
916                .bucket(&self.bucket)
917                .key(generate_full_key(&self.prefix, key))
918                .set_version_id(version_id.clone())
919                .object_attributes(ObjectAttributes::ObjectParts)
920                .set_part_number_marker(part_number_marker)
921                .set_sse_customer_algorithm(sse_c.clone())
922                .set_sse_customer_key(sse_c_key.key.clone())
923                .set_sse_customer_key_md5(sse_c_key_md5.clone())
924                .max_parts(max_parts)
925                .send()
926                .await
927                .context("aws_sdk_s3::client::get_object_attributes() failed.")?;
928
929            // A full object checksum has empty object parts.
930            if object.object_parts().is_none() || object.object_parts().unwrap().parts().is_empty()
931            {
932                return Ok(vec![]);
933            }
934
935            for part in object.object_parts().unwrap().parts() {
936                object_parts.push(part.clone());
937            }
938
939            if !object.object_parts().unwrap().is_truncated().unwrap() {
940                break;
941            }
942
943            part_number_marker = object
944                .object_parts()
945                .unwrap()
946                .next_part_number_marker()
947                .map(|marker| marker.to_string());
948        }
949
950        Ok(object_parts)
951    }
952
953    async fn put_object(
954        &self,
955        key: &str,
956        source: Storage,
957        source_size: u64,
958        source_additional_checksum: Option<String>,
959        mut get_object_output_first_chunk: GetObjectOutput,
960        tagging: Option<String>,
961        object_checksum: Option<ObjectChecksum>,
962        if_match: Option<String>,
963        copy_source_if_match: Option<String>,
964    ) -> Result<PutObjectOutput> {
965        let mut version_id = "".to_string();
966        if let Some(source_version_id) = get_object_output_first_chunk.version_id().as_ref() {
967            version_id = source_version_id.to_string();
968        }
969        let target_key = generate_full_key(&self.prefix, key);
970        let source_key = key;
971        let source_last_modified = aws_smithy_types::DateTime::from_millis(
972            get_object_output_first_chunk
973                .last_modified()
974                .unwrap()
975                .to_millis()?,
976        )
977        .to_chrono_utc()?
978        .to_rfc3339();
979
980        if self.config.dry_run {
981            self.send_stats(SyncBytes(source_size)).await;
982
983            let mut event_data = EventData::new(EventType::SYNC_COMPLETE);
984            event_data.key = Some(key.to_string());
985            // skipcq: RS-W1070
986            event_data.source_version_id = get_object_output_first_chunk
987                .version_id()
988                .as_ref()
989                .map(|v| v.to_string());
990            event_data.source_last_modified =
991                get_object_output_first_chunk.last_modified().copied();
992            // skipcq: RS-W1070
993            event_data.source_etag = get_object_output_first_chunk.e_tag().map(|e| e.to_string());
994            event_data.source_size = Some(source_size);
995            event_data.target_size = Some(source_size); // Assuming the size is the same as source
996            self.config.event_manager.trigger_event(event_data).await;
997
998            info!(
999                key = key,
1000                source_version_id = version_id,
1001                source_last_modified = source_last_modified,
1002                target_key = target_key,
1003                size = source_size.to_string(),
1004                if_match = if_match.clone(),
1005                copy_source_if_match = copy_source_if_match.clone(),
1006                "[dry-run] sync completed.",
1007            );
1008
1009            return Ok(PutObjectOutput::builder().build());
1010        }
1011
1012        // In the case of full object checksum, we don't need to calculate checksum for each part and
1013        // don't need to pass it to the upload manager.
1014        let additional_checksum_value = get_additional_checksum(
1015            &get_object_output_first_chunk,
1016            object_checksum.as_ref().unwrap().checksum_algorithm.clone(),
1017        );
1018        let full_object_checksum = is_full_object_checksum(&additional_checksum_value);
1019        #[allow(clippy::unnecessary_unwrap)]
1020        let checksum = if object_checksum.is_some()
1021            && object_checksum
1022                .as_ref()
1023                .unwrap()
1024                .checksum_algorithm
1025                .is_some()
1026            && !self.config.full_object_checksum
1027            && !full_object_checksum
1028        {
1029            Some(Arc::new(AdditionalChecksum::new(
1030                object_checksum
1031                    .as_ref()
1032                    .unwrap()
1033                    .checksum_algorithm
1034                    .as_ref()
1035                    .unwrap()
1036                    .clone(),
1037                self.config.full_object_checksum,
1038            )))
1039        } else {
1040            None
1041        };
1042
1043        get_object_output_first_chunk.body = convert_to_buf_byte_stream_with_callback(
1044            get_object_output_first_chunk.body.into_async_read(),
1045            self.get_stats_sender(),
1046            self.rate_limit_bandwidth.clone(),
1047            checksum,
1048            object_checksum.clone(),
1049        );
1050
1051        let mut upload_manager = UploadManager::new(
1052            self.client.clone().unwrap(),
1053            self.config.clone(),
1054            self.request_payer.clone(),
1055            self.cancellation_token.clone(),
1056            self.get_stats_sender(),
1057            tagging,
1058            object_checksum.unwrap_or_default().object_parts,
1059            self.is_express_onezone_storage(),
1060            source,
1061            source_key.to_string(),
1062            source_size,
1063            source_additional_checksum,
1064            if_match,
1065            copy_source_if_match,
1066            self.has_warning.clone(),
1067        );
1068
1069        self.exec_rate_limit_objects_per_sec().await;
1070
1071        let put_object_output = upload_manager
1072            .upload(&self.bucket, &target_key, get_object_output_first_chunk)
1073            .await?;
1074
1075        // If preprocess_callback is registered and preprocess was cancelled, e_tag will be None.
1076        if put_object_output.e_tag.is_some() {
1077            info!(
1078                key = key,
1079                source_version_id = version_id,
1080                source_last_modified = source_last_modified,
1081                target_key = target_key,
1082                size = source_size,
1083                "sync completed.",
1084            );
1085        }
1086
1087        Ok(put_object_output)
1088    }
1089
1090    async fn put_object_tagging(
1091        &self,
1092        key: &str,
1093        version_id: Option<String>,
1094        tagging: Tagging,
1095    ) -> Result<PutObjectTaggingOutput> {
1096        let target_key = generate_full_key(&self.prefix, key);
1097        let version_id_str = version_id.clone().unwrap_or_default();
1098
1099        if self.config.dry_run {
1100            info!(
1101                key = key,
1102                target_version_id = version_id_str,
1103                target_key = target_key,
1104                "[dry-run] sync(tagging only) completed.",
1105            );
1106
1107            return Ok(PutObjectTaggingOutput::builder().build());
1108        }
1109
1110        self.exec_rate_limit_objects_per_sec().await;
1111
1112        let result = self
1113            .client
1114            .as_ref()
1115            .unwrap()
1116            .put_object_tagging()
1117            .set_request_payer(self.request_payer.clone())
1118            .bucket(&self.bucket)
1119            .key(&target_key)
1120            .set_version_id(version_id.clone())
1121            .tagging(tagging)
1122            .send()
1123            .await
1124            .context("aws_sdk_s3::client::put_object_tagging() failed.")?;
1125
1126        info!(
1127            key = key,
1128            target_version_id = version_id_str,
1129            target_key = target_key,
1130            "sync(tagging only) completed.",
1131        );
1132
1133        Ok(result)
1134    }
1135
1136    async fn delete_object(
1137        &self,
1138        key: &str,
1139        version_id: Option<String>,
1140        if_match: Option<String>,
1141    ) -> Result<DeleteObjectOutput> {
1142        let target_key = generate_full_key(&self.prefix, key);
1143        let version_id_str = version_id.clone().unwrap_or_default();
1144
1145        if self.config.dry_run {
1146            info!(
1147                key = key,
1148                target_version_id = version_id_str,
1149                target_key = target_key,
1150                if_match = if_match.clone(),
1151                "[dry-run] delete completed.",
1152            );
1153
1154            return Ok(DeleteObjectOutput::builder().build());
1155        }
1156
1157        self.exec_rate_limit_objects_per_sec().await;
1158
1159        let result = self
1160            .client
1161            .as_ref()
1162            .unwrap()
1163            .delete_object()
1164            .set_request_payer(self.request_payer.clone())
1165            .bucket(&self.bucket)
1166            .key(&target_key)
1167            .set_version_id(version_id.clone())
1168            .set_if_match(if_match.clone())
1169            .send()
1170            .await
1171            .context("aws_sdk_s3::client::delete_object() failed.")?;
1172
1173        info!(
1174            key = key,
1175            target_version_id = version_id_str,
1176            target_key = target_key,
1177            if_match = if_match.clone(),
1178            "delete completed.",
1179        );
1180
1181        Ok(result)
1182    }
1183
1184    async fn delete_object_tagging(
1185        &self,
1186        key: &str,
1187        version_id: Option<String>,
1188    ) -> Result<DeleteObjectTaggingOutput> {
1189        let target_key = generate_full_key(&self.prefix, key);
1190        let version_id_str = version_id.clone().unwrap_or_default();
1191
1192        if self.config.dry_run {
1193            info!(
1194                key = key,
1195                target_version_id = version_id_str,
1196                target_key = target_key,
1197                "[dry-run] sync(delete tagging only) completed.",
1198            );
1199
1200            return Ok(DeleteObjectTaggingOutput::builder().build());
1201        }
1202
1203        self.exec_rate_limit_objects_per_sec().await;
1204
1205        let result = self
1206            .client
1207            .as_ref()
1208            .unwrap()
1209            .delete_object_tagging()
1210            .bucket(&self.bucket)
1211            .key(&target_key)
1212            .set_version_id(version_id.clone())
1213            .send()
1214            .await
1215            .context("aws_sdk_s3::client::delete_object_tagging() failed.")?;
1216
1217        info!(
1218            key = key,
1219            target_version_id = version_id_str,
1220            target_key = target_key,
1221            "sync(delete tagging only) completed.",
1222        );
1223
1224        Ok(result)
1225    }
1226
1227    async fn is_versioning_enabled(&self) -> Result<bool> {
1228        let result = self
1229            .client
1230            .as_ref()
1231            .unwrap()
1232            .get_bucket_versioning()
1233            .bucket(&self.bucket)
1234            .send()
1235            .await
1236            .context("aws_sdk_s3::client::get_bucket_versioning() failed.")?;
1237
1238        if result.status().is_none() {
1239            return Ok(false);
1240        }
1241
1242        Ok(*result.status().unwrap() == BucketVersioningStatus::Enabled)
1243    }
1244
1245    fn get_client(&self) -> Option<Arc<Client>> {
1246        self.client.clone()
1247    }
1248
1249    fn get_stats_sender(&self) -> Sender<SyncStatistics> {
1250        self.stats_sender.clone()
1251    }
1252
1253    async fn send_stats(&self, stats: SyncStatistics) {
1254        let _ = self.stats_sender.send(stats).await;
1255    }
1256
1257    #[cfg_attr(coverage_nightly, coverage(off))]
1258    fn get_local_path(&self) -> PathBuf {
1259        // S3 storage does not have a local path.
1260        unimplemented!();
1261    }
1262
1263    fn get_rate_limit_bandwidth(&self) -> Option<Arc<RateLimiter>> {
1264        self.rate_limit_bandwidth.clone()
1265    }
1266
1267    fn generate_copy_source_key(&self, key: &str, version_id: Option<String>) -> String {
1268        let full_key = generate_full_key(&self.prefix, key);
1269        let full_key = urlencoding::encode(&full_key);
1270
1271        if version_id.is_some() {
1272            return format!(
1273                "{}/{}?versionId={}",
1274                &self.bucket,
1275                full_key,
1276                version_id.unwrap()
1277            );
1278        }
1279        format!("{}/{}", &self.bucket, full_key)
1280    }
1281
1282    fn set_warning(&self) {
1283        self.has_warning
1284            .store(true, std::sync::atomic::Ordering::SeqCst);
1285    }
1286}
1287
1288pub fn remove_s3_prefix(key: &str, prefix: &str) -> String {
1289    key.to_string().replacen(prefix, "", 1)
1290}
1291
1292pub fn generate_full_key(prefix: &str, key: &str) -> String {
1293    format!("{prefix}{key}")
1294}
1295
1296fn is_express_onezone_storage(bucket: &str) -> bool {
1297    bucket.ends_with(EXPRESS_ONEZONE_STORAGE_SUFFIX)
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302    use super::*;
1303    use crate::config::args::parse_from_args;
1304    use crate::types::token::create_pipeline_cancellation_token;
1305    use tracing_subscriber::EnvFilter;
1306
1307    #[test]
1308    fn remove_s3_prefix_test() {
1309        init_dummy_tracing_subscriber();
1310
1311        assert_eq!(remove_s3_prefix("dir1/data1", "dir1/data1"), "");
1312
1313        assert_eq!(remove_s3_prefix("dir1/data1", "dir1"), "/data1");
1314        assert_eq!(remove_s3_prefix("dir1/data1", "dir1/"), "data1");
1315        assert_eq!(remove_s3_prefix("/dir1/data1", "/dir1"), "/data1");
1316        assert_eq!(remove_s3_prefix("/dir1/data1", "/dir1/"), "data1");
1317    }
1318
1319    #[test]
1320    fn is_express_onezone_storage_test() {
1321        init_dummy_tracing_subscriber();
1322
1323        assert!(is_express_onezone_storage("bucket--x-s3"));
1324
1325        assert!(!is_express_onezone_storage("bucket-x-s3"));
1326        assert!(!is_express_onezone_storage("bucket--x-s3s"));
1327        assert!(!is_express_onezone_storage("bucket"));
1328    }
1329
1330    #[tokio::test]
1331    async fn create_storage() {
1332        init_dummy_tracing_subscriber();
1333
1334        let args = vec![
1335            "s3sync",
1336            "--source-access-key",
1337            "source_access_key",
1338            "--source-secret-access-key",
1339            "source_secret_access_key",
1340            "--target-access-key",
1341            "target_access_key",
1342            "--target-secret-access-key",
1343            "target_secret_access_key",
1344            "s3://source-bucket",
1345            "s3://target-bucket",
1346        ];
1347        let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1348        let (stats_sender, _) = async_channel::unbounded();
1349
1350        let storage = S3StorageFactory::create(
1351            config.clone(),
1352            config.source.clone(),
1353            create_pipeline_cancellation_token(),
1354            stats_sender,
1355            config.source_client_config.clone(),
1356            None,
1357            None,
1358            None,
1359            Arc::new(AtomicBool::new(false)),
1360            None,
1361        )
1362        .await;
1363
1364        assert!(storage.get_client().is_some());
1365    }
1366
1367    #[tokio::test]
1368    async fn get_object_error() {
1369        init_dummy_tracing_subscriber();
1370
1371        let args = vec![
1372            "s3sync",
1373            "--dry-run",
1374            "--target-access-key",
1375            "dummy_access_key",
1376            "--target-secret-access-key",
1377            "dummy_secret_access_key",
1378            "--aws-max-attempts",
1379            "1",
1380            "--target-endpoint-url",
1381            "https://invalid-s3-endpoint-url.6329313.local:65535",
1382            "--force-retry-count",
1383            "1",
1384            "--force-retry-interval-milliseconds",
1385            "1",
1386            "./test_data/",
1387            "s3://dummy-bucket",
1388        ];
1389        let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1390        let (stats_sender, _) = async_channel::unbounded();
1391
1392        let storage = S3StorageFactory::create(
1393            config.clone(),
1394            config.target.clone(),
1395            create_pipeline_cancellation_token(),
1396            stats_sender,
1397            config.target_client_config.clone(),
1398            None,
1399            None,
1400            None,
1401            Arc::new(AtomicBool::new(false)),
1402            None,
1403        )
1404        .await;
1405
1406        assert!(
1407            storage
1408                .get_object(
1409                    "source/data1",
1410                    None,
1411                    None,
1412                    None,
1413                    None,
1414                    SseCustomerKey { key: None },
1415                    None,
1416                )
1417                .await
1418                .is_err()
1419        );
1420    }
1421
1422    #[tokio::test]
1423    async fn is_local_storage() {
1424        init_dummy_tracing_subscriber();
1425
1426        let args = vec![
1427            "s3sync",
1428            "--source-access-key",
1429            "source_access_key",
1430            "--source-secret-access-key",
1431            "source_secret_access_key",
1432            "--target-access-key",
1433            "target_access_key",
1434            "--target-secret-access-key",
1435            "target_secret_access_key",
1436            "s3://source-bucket",
1437            "s3://target-bucket",
1438        ];
1439        let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1440        let (stats_sender, _) = async_channel::unbounded();
1441
1442        let storage = S3StorageFactory::create(
1443            config.clone(),
1444            config.source.clone(),
1445            create_pipeline_cancellation_token(),
1446            stats_sender,
1447            config.source_client_config.clone(),
1448            None,
1449            None,
1450            None,
1451            Arc::new(AtomicBool::new(false)),
1452            None,
1453        )
1454        .await;
1455
1456        assert!(storage.get_client().is_some());
1457        assert!(!storage.is_local_storage());
1458    }
1459
1460    #[tokio::test]
1461    #[should_panic]
1462    async fn create_storage_panic() {
1463        init_dummy_tracing_subscriber();
1464
1465        let args = vec![
1466            "s3sync",
1467            "--source-access-key",
1468            "source_access_key",
1469            "--source-secret-access-key",
1470            "source_secret_access_key",
1471            "--target-access-key",
1472            "target_access_key",
1473            "--target-secret-access-key",
1474            "target_secret_access_key",
1475            "/source-dir",
1476            "s3://target-bucket",
1477        ];
1478        let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1479        let (stats_sender, _) = async_channel::unbounded();
1480
1481        S3StorageFactory::create(
1482            config.clone(),
1483            config.source.clone(),
1484            create_pipeline_cancellation_token(),
1485            stats_sender,
1486            config.source_client_config.clone(),
1487            None,
1488            None,
1489            None,
1490            Arc::new(AtomicBool::new(false)),
1491            None,
1492        )
1493        .await;
1494    }
1495
1496    #[tokio::test]
1497    async fn stats_channel_test() {
1498        init_dummy_tracing_subscriber();
1499
1500        let args = vec![
1501            "s3sync",
1502            "--source-access-key",
1503            "source_access_key",
1504            "--source-secret-access-key",
1505            "source_secret_access_key",
1506            "--target-access-key",
1507            "target_access_key",
1508            "--target-secret-access-key",
1509            "target_secret_access_key",
1510            "s3://source-bucket",
1511            "s3://target-bucket",
1512        ];
1513        let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
1514        let (stats_sender, stats_receiver) = async_channel::unbounded();
1515
1516        let storage = S3StorageFactory::create(
1517            config.clone(),
1518            config.source.clone(),
1519            create_pipeline_cancellation_token(),
1520            stats_sender,
1521            config.source_client_config.clone(),
1522            None,
1523            None,
1524            None,
1525            Arc::new(AtomicBool::new(false)),
1526            None,
1527        )
1528        .await;
1529
1530        let stats_sender = storage.get_stats_sender();
1531
1532        stats_sender.send(SyncBytes(0)).await.unwrap();
1533        assert_eq!(stats_receiver.recv().await.unwrap(), SyncBytes(0));
1534    }
1535
1536    #[tokio::test]
1537    async fn generate_full_key_test() {
1538        init_dummy_tracing_subscriber();
1539
1540        assert_eq!(generate_full_key("dir1/", "data1"), "dir1/data1");
1541        assert_eq!(generate_full_key("dir1", "data1"), "dir1data1");
1542
1543        assert_eq!(generate_full_key("data1", "data1"), "data1data1");
1544    }
1545
1546    fn init_dummy_tracing_subscriber() {
1547        let _ = tracing_subscriber::fmt()
1548            .with_env_filter(
1549                EnvFilter::try_from_default_env()
1550                    .or_else(|_| EnvFilter::try_new("dummy=trace"))
1551                    .unwrap(),
1552            )
1553            .try_init();
1554    }
1555}