1use std::{
2 collections::HashMap,
3 {os::unix::fs::PermissionsExt, path::Path},
4};
5
6use crate::errors::{self, Error, Result};
7use aws_sdk_s3::{
8 operation::{
9 create_bucket::CreateBucketError,
10 delete_bucket::DeleteBucketError,
11 delete_objects::DeleteObjectsError,
12 head_bucket::HeadBucketError,
13 head_object::{HeadObjectError, HeadObjectOutput},
14 put_bucket_lifecycle_configuration::PutBucketLifecycleConfigurationError,
15 },
16 primitives::ByteStream,
17 types::{
18 BucketCannedAcl, BucketLifecycleConfiguration, BucketLocationConstraint,
19 CreateBucketConfiguration, Delete, ExpirationStatus, LifecycleExpiration, LifecycleRule,
20 LifecycleRuleFilter, Object, ObjectCannedAcl, ObjectIdentifier,
21 PublicAccessBlockConfiguration, ServerSideEncryption, ServerSideEncryptionByDefault,
22 ServerSideEncryptionConfiguration, ServerSideEncryptionRule,
23 },
24 Client,
25};
26use aws_smithy_client::SdkError;
27use aws_types::SdkConfig as AwsSdkConfig;
28use tokio::{
29 fs::{self, File},
30 io::AsyncWriteExt,
31 time::{sleep, Duration, Instant},
32};
33use tokio_stream::StreamExt;
34
35#[derive(Debug, Clone)]
37pub struct Manager {
38 pub region: String,
39 pub cli: Client,
40}
41
42impl Manager {
43 pub fn new(shared_config: &AwsSdkConfig) -> Self {
44 Self {
45 region: shared_config.region().unwrap().to_string(),
46 cli: Client::new(shared_config),
47 }
48 }
49
50 pub async fn create_bucket(&self, s3_bucket: &str) -> Result<()> {
52 log::info!("creating bucket '{s3_bucket}' in region {}", self.region);
53
54 let mut req = self
55 .cli
56 .create_bucket()
57 .bucket(s3_bucket)
58 .acl(BucketCannedAcl::Private);
59
60 if self.region != "us-east-1" {
62 let constraint = BucketLocationConstraint::from(self.region.as_str());
63 let bucket_cfg = CreateBucketConfiguration::builder()
64 .location_constraint(constraint)
65 .build();
66 req = req.create_bucket_configuration(bucket_cfg);
67 }
68
69 let ret = req.send().await;
70 let already_created = match ret {
71 Ok(_) => false,
72 Err(e) => {
73 if !is_err_already_exists_create_bucket(&e) {
74 return Err(Error::API {
75 message: format!("failed create_bucket {:?}", e),
76 retryable: errors::is_sdk_err_retryable(&e),
77 });
78 }
79 log::warn!(
80 "bucket already exists so returning early (original error '{}')",
81 e
82 );
83 true
84 }
85 };
86 if already_created {
87 return Ok(());
88 }
89 log::info!("created bucket '{s3_bucket}'");
90
91 log::info!("setting bucket public_access_block configuration to private");
92 let public_access_block_cfg = PublicAccessBlockConfiguration::builder()
93 .block_public_acls(true)
94 .block_public_policy(true)
95 .ignore_public_acls(true)
96 .restrict_public_buckets(true)
97 .build();
98 self.cli
99 .put_public_access_block()
100 .bucket(s3_bucket)
101 .public_access_block_configuration(public_access_block_cfg)
102 .send()
103 .await
104 .map_err(|e| Error::API {
105 message: format!("failed put_public_access_block {}", e),
106 retryable: errors::is_sdk_err_retryable(&e),
107 })?;
108
109 let algo = ServerSideEncryption::Aes256;
110 let sse = ServerSideEncryptionByDefault::builder()
111 .set_sse_algorithm(Some(algo))
112 .build();
113 let server_side_encryption_rule = ServerSideEncryptionRule::builder()
114 .apply_server_side_encryption_by_default(sse)
115 .build();
116 let server_side_encryption_cfg = ServerSideEncryptionConfiguration::builder()
117 .rules(server_side_encryption_rule)
118 .build();
119 self.cli
120 .put_bucket_encryption()
121 .bucket(s3_bucket)
122 .server_side_encryption_configuration(server_side_encryption_cfg)
123 .send()
124 .await
125 .map_err(|e| Error::API {
126 message: format!("failed put_bucket_encryption {}", e),
127 retryable: errors::is_sdk_err_retryable(&e),
128 })?;
129
130 Ok(())
131 }
132
133 pub async fn put_bucket_object_expire_configuration(
138 &self,
139 s3_bucket: &str,
140 days_to_prefixes: HashMap<i32, Vec<String>>,
141 ) -> Result<()> {
142 if days_to_prefixes.is_empty() {
143 return Err(Error::Other {
144 message: "empty prefixes".to_string(),
145 retryable: false,
146 });
147 }
148
149 log::info!(
150 "put bucket object expire configuration for '{s3_bucket}' with prefixes '{:?}' in region '{}'",
151 days_to_prefixes, self.region
152 );
153 let mut rules = Vec::new();
154 for (days, pfxs) in days_to_prefixes.iter() {
155 for pfx in pfxs {
156 rules.push(
157 LifecycleRule::builder()
159 .filter(LifecycleRuleFilter::Prefix(pfx.to_owned()))
161 .expiration(LifecycleExpiration::builder().days(days.to_owned()).build())
162 .status(ExpirationStatus::Enabled) .build(),
164 );
165 }
166 }
167 let lifecycle = BucketLifecycleConfiguration::builder()
168 .set_rules(Some(rules))
169 .build();
170
171 let _ = self
173 .cli
174 .put_bucket_lifecycle_configuration()
175 .bucket(s3_bucket)
176 .lifecycle_configuration(lifecycle)
177 .send()
178 .await
179 .map_err(|e| Error::API {
180 message: format!(
181 "failed put_bucket_lifecycle_configuration '{}'",
182 explain_err_put_bucket_lifecycle_configuration(&e)
183 ),
184 retryable: errors::is_sdk_err_retryable(&e),
185 })?;
186
187 log::info!("successfullhy updated bucket lifecycle configuration");
188 Ok(())
189 }
190
191 pub async fn delete_bucket(&self, s3_bucket: &str) -> Result<()> {
193 log::info!("deleting bucket '{s3_bucket}' in region '{}'", self.region);
194 match self.cli.delete_bucket().bucket(s3_bucket).send().await {
195 Ok(_) => {
196 log::info!("successfully deleted bucket '{s3_bucket}'");
197 }
198 Err(e) => {
199 if !is_err_does_not_exist_delete_bucket(&e) {
200 return Err(Error::API {
201 message: format!(
202 "failed delete_bucket '{}'",
203 explain_err_delete_bucket(&e)
204 ),
205 retryable: errors::is_sdk_err_retryable(&e),
206 });
207 }
208 log::warn!(
209 "bucket already deleted or does not exist '{}'",
210 explain_err_delete_bucket(&e)
211 );
212 }
213 };
214
215 Ok(())
216 }
217
218 pub async fn bucket_exists(&self, s3_bucket: &str) -> Result<bool> {
220 log::info!("checking whether bucket '{s3_bucket}' exists");
221
222 match self.cli.head_bucket().bucket(s3_bucket).send().await {
223 Ok(_) => {}
224 Err(e) => {
225 if is_err_head_bucket_not_found(&e) {
226 return Ok(false);
227 }
228 return Err(Error::API {
229 message: format!("failed head_bucket '{}'", e),
230 retryable: errors::is_sdk_err_retryable(&e),
231 });
232 }
233 }
234
235 Ok(true)
236 }
237
238 pub async fn delete_objects(&self, s3_bucket: &str, prefix: Option<&str>) -> Result<()> {
246 log::info!(
247 "deleting objects in bucket '{s3_bucket}' in region '{}' (prefix {:?})",
248 self.region,
249 prefix,
250 );
251
252 if !self.bucket_exists(s3_bucket).await? {
253 return Err(Error::API {
254 message: format!("bucket '{s3_bucket}' not found",),
255 retryable: false,
256 });
257 }
258
259 let objects = self.list_objects(s3_bucket.clone(), prefix).await?;
260 let mut object_ids: Vec<ObjectIdentifier> = vec![];
261 for obj in objects {
262 let k = String::from(obj.key().unwrap_or(""));
263 let obj_id = ObjectIdentifier::builder().set_key(Some(k)).build();
264 object_ids.push(obj_id);
265 }
266
267 let n = object_ids.len();
268 if n > 0 {
269 let deletes = Delete::builder().set_objects(Some(object_ids)).build();
270 match self
271 .cli
272 .delete_objects()
273 .bucket(s3_bucket.to_string())
274 .delete(deletes)
275 .send()
276 .await
277 {
278 Ok(_) => {}
279 Err(e) => {
280 return Err(Error::API {
281 message: format!(
282 "failed delete_objects '{}'",
283 explain_err_delete_objects(&e)
284 ),
285 retryable: errors::is_sdk_err_retryable(&e),
286 });
287 }
288 };
289 log::info!("deleted {} objets in bucket '{s3_bucket}'", n);
290 } else {
291 log::info!("nothing to delete; skipping...");
292 }
293
294 Ok(())
295 }
296
297 pub async fn list_objects(&self, s3_bucket: &str, prefix: Option<&str>) -> Result<Vec<Object>> {
307 let pfx = {
308 if let Some(s) = prefix {
309 let s = s.to_string();
310 if s.is_empty() {
311 None
312 } else {
313 Some(s)
314 }
315 } else {
316 None
317 }
318 };
319
320 log::info!(
321 "listing bucket '{s3_bucket}' in region '{}' with prefix '{:?}'",
322 self.region,
323 pfx
324 );
325 let mut objects: Vec<Object> = Vec::new();
326 let mut token = String::new();
327 loop {
328 let mut builder = self.cli.list_objects_v2().bucket(s3_bucket.to_string());
329 if pfx.is_some() {
330 builder = builder.set_prefix(pfx.clone());
331 }
332 if !token.is_empty() {
333 builder = builder.set_continuation_token(Some(token.to_owned()));
334 }
335 let ret = match builder.send().await {
336 Ok(r) => r,
337 Err(e) => {
338 return Err(Error::API {
339 message: format!("failed list_objects_v2 {:?}", e),
340 retryable: errors::is_sdk_err_retryable(&e),
341 });
342 }
343 };
344 if ret.key_count == 0 {
345 break;
346 }
347 if ret.contents.is_none() {
348 break;
349 }
350 let contents = ret.contents.unwrap();
351 for obj in contents.iter() {
352 let k = obj.key().unwrap_or("");
353 if k.is_empty() {
354 return Err(Error::API {
355 message: String::from("empty key returned"),
356 retryable: false,
357 });
358 }
359 log::debug!("listing [{}]", k);
360 objects.push(obj.to_owned());
361 }
362
363 token = match ret.next_continuation_token {
364 Some(v) => v,
365 None => String::new(),
366 };
367 if token.is_empty() {
368 break;
369 }
370 }
371
372 if objects.len() > 1 {
373 log::info!(
374 "sorting {} objects in bucket {s3_bucket} with prefix {:?}",
375 objects.len(),
376 pfx
377 );
378 objects.sort_by(|a, b| {
379 let a_modified = a.last_modified.unwrap();
380 let a_modified = a_modified.as_nanos();
381
382 let b_modified = b.last_modified.unwrap();
383 let b_modified = b_modified.as_nanos();
384
385 b_modified.cmp(&a_modified)
389 });
390 }
391 Ok(objects)
392 }
393
394 pub async fn put_object(&self, file_path: &str, s3_bucket: &str, s3_key: &str) -> Result<()> {
406 self.put_object_with_metadata(file_path, s3_bucket, s3_key, None)
407 .await
408 }
409
410 pub async fn put_object_with_retries(
411 &self,
412 file_path: &str,
413 s3_bucket: &str,
414 s3_key: &str,
415 timeout: Duration,
416 interval: Duration,
417 ) -> Result<()> {
418 self.put_object_with_metadata_with_retries(
419 file_path, s3_bucket, s3_key, None, timeout, interval,
420 )
421 .await
422 }
423
424 pub async fn put_object_with_metadata(
428 &self,
429 file_path: &str,
430 s3_bucket: &str,
431 s3_key: &str,
432 metadata: Option<HashMap<String, String>>,
433 ) -> Result<()> {
434 let b = read_file_to_bytes(file_path)?;
439 let size = b.len() as f64;
440
441 log::info!(
442 "put object '{file_path}' (size {}) to 's3://{}/{}' (region '{}')",
443 human_readable::bytes(size),
444 s3_bucket,
445 s3_key,
446 self.region,
447 );
448 self.put_bytes_with_metadata_with_retries(
449 b,
450 s3_bucket,
451 s3_key,
452 metadata,
453 Duration::from_secs(180),
454 Duration::from_secs(10),
455 )
456 .await
457 }
458
459 pub async fn put_object_with_metadata_with_retries(
460 &self,
461 file_path: &str,
462 s3_bucket: &str,
463 s3_key: &str,
464 metadata: Option<HashMap<String, String>>,
465 timeout: Duration,
466 interval: Duration,
467 ) -> Result<()> {
468 let b = read_file_to_bytes(file_path)?;
473 let size = b.len() as f64;
474
475 log::info!(
476 "put object '{file_path}' (size {}) to 's3://{}/{}' (retries timeout '{:?}', region '{}')",
477 human_readable::bytes(size),
478 s3_bucket,
479 s3_key,
480 timeout,
481 self.region,
482 );
483
484 self.put_bytes_with_metadata_with_retries(b, s3_bucket, s3_key, metadata, timeout, interval)
485 .await
486 }
487
488 pub async fn put_byte_stream_with_metadata(
492 &self,
493 byte_stream: ByteStream,
494 s3_bucket: &str,
495 s3_key: &str,
496 metadata: Option<HashMap<String, String>>,
497 ) -> Result<()> {
498 log::info!(
499 "put_byte_stream_with_metadata to 's3://{}/{}' (region '{}')",
500 s3_bucket,
501 s3_key,
502 self.region
503 );
504
505 let mut req = self
506 .cli
507 .put_object()
508 .bucket(s3_bucket.to_string())
509 .key(s3_key.to_string())
510 .body(byte_stream)
511 .acl(ObjectCannedAcl::Private);
512 if let Some(md) = &metadata {
513 for (k, v) in md {
514 if !k.starts_with("x-amz-meta-") {
517 return Err(Error::Other {
518 message: format!(
519 "user-defined metadata key '{}' is missing the prefix 'x-amz-meta-'",
520 k
521 ),
522 retryable: false,
523 });
524 }
525
526 if v.len() > 2048 {
529 return Err(Error::Other {
530 message: format!(
531 "user-defined metadata value is {}-byte, exceeds 2 KiB limit",
532 v.len()
533 ),
534 retryable: false,
535 });
536 }
537
538 req = req.metadata(k, v);
539 }
540 }
541
542 req.send().await.map_err(|e| Error::API {
543 message: format!("failed put_object '{}'", e),
544 retryable: errors::is_sdk_err_retryable(&e),
545 })?;
546
547 Ok(())
548 }
549
550 pub async fn put_bytes_with_metadata_with_retries(
551 &self,
552 b: Vec<u8>,
553 s3_bucket: &str,
554 s3_key: &str,
555 metadata: Option<HashMap<String, String>>,
556 timeout: Duration,
557 interval: Duration,
558 ) -> Result<()> {
559 log::info!(
560 "put_bytes_with_metadata_with_retries '{s3_bucket}' '{s3_key}' in region '{}' with retries timeout {:?} and interval {:?}",
561 self.region,
562 timeout,
563 interval,
564 );
565
566 let start = Instant::now();
567 let mut cnt: u128 = 0;
568 loop {
569 let elapsed = start.elapsed();
570 if elapsed.gt(&timeout) {
571 return Err(Error::API {
572 message: "put_byte_with_metadata_with_retries not complete in time".to_string(),
573 retryable: true,
574 });
575 }
576
577 let itv = {
578 if cnt == 0 {
579 Duration::from_secs(1)
581 } else {
582 interval
583 }
584 };
585 sleep(itv).await;
586
587 match self
588 .put_byte_stream_with_metadata(
589 ByteStream::from(b.clone()),
590 s3_bucket,
591 s3_key,
592 metadata.clone(),
593 )
594 .await
595 {
596 Ok(_) => return Ok(()),
597 Err(e) => {
598 if !e.retryable() {
599 return Err(e);
600 }
601 }
602 }
603
604 cnt += 1;
605 }
606 }
607
608 pub async fn exists(&self, s3_bucket: &str, s3_key: &str) -> Result<Option<HeadObjectOutput>> {
610 let head_output = match self
611 .cli
612 .head_object()
613 .bucket(s3_bucket.to_string())
614 .key(s3_key.to_string())
615 .send()
616 .await
617 {
618 Ok(out) => out,
619 Err(e) => {
620 if is_err_head_object_not_found(&e) {
621 log::info!("{s3_key} not found");
622 return Ok(None);
623 }
624
625 log::warn!("failed to head {s3_key}: {}", explain_err_head_object(&e));
626 return Err(Error::API {
627 message: format!("failed head_object {}", e),
628 retryable: errors::is_sdk_err_retryable(&e),
629 });
630 }
631 };
632
633 log::info!(
634 "head object exists 's3://{}/{}' (content type '{}', size {})",
635 s3_bucket,
636 s3_key,
637 head_output.content_type().unwrap(),
638 human_readable::bytes(head_output.content_length() as f64),
639 );
640 Ok(Some(head_output))
641 }
642
643 pub async fn exists_with_retries(
644 &self,
645 s3_bucket: &str,
646 s3_key: &str,
647 timeout: Duration,
648 interval: Duration,
649 ) -> Result<Option<HeadObjectOutput>> {
650 log::info!(
651 "exists_with_retries '{s3_bucket}' '{s3_key}' exists with timeout {:?} and interval {:?}",
652 timeout,
653 interval,
654 );
655
656 let start = Instant::now();
657 let mut cnt: u128 = 0;
658 loop {
659 let elapsed = start.elapsed();
660 if elapsed.gt(&timeout) {
661 return Err(Error::API {
662 message: "exists_with_retries not complete in time".to_string(),
663 retryable: true,
664 });
665 }
666
667 let itv = {
668 if cnt == 0 {
669 Duration::from_secs(1)
671 } else {
672 interval
673 }
674 };
675 sleep(itv).await;
676
677 match self.exists(s3_bucket, s3_key).await {
678 Ok(head) => return Ok(head),
679 Err(e) => {
680 if !e.retryable() {
681 return Err(e);
682 }
683 log::warn!("retriable s3 error '{}'", e);
684 }
685 }
686
687 cnt += 1;
688 }
689 }
690
691 pub async fn get_object(
704 &self,
705 s3_bucket: &str,
706 s3_key: &str,
707 file_path: &str,
708 overwrite: bool,
709 ) -> Result<bool> {
710 let need_delete = if Path::new(file_path).exists() {
711 if !overwrite {
712 return Err(Error::Other {
713 message: format!("file path '{file_path}' already exists and overwrite=false"),
714 retryable: false,
715 });
716 }
717 true
718 } else {
719 false
720 };
721
722 log::info!("checking if the s3 object '{s3_key}' exists before downloading");
723 let head_object = self.exists(s3_bucket, s3_key).await?;
724 if head_object.is_none() {
725 log::warn!("s3 file '{s3_key}' does not exist in the bucket {s3_bucket}");
726 return Ok(false);
727 }
728
729 let mut output = self
730 .cli
731 .get_object()
732 .bucket(s3_bucket.to_string())
733 .key(s3_key.to_string())
734 .send()
735 .await
736 .map_err(|e| Error::API {
737 message: format!("failed get_object {}", e),
738 retryable: errors::is_sdk_err_retryable(&e),
739 })?;
740
741 if need_delete {
742 log::info!("removing file before creating a new one");
743 fs::remove_file(file_path).await.map_err(|e| Error::Other {
744 message: format!("failed fs::remove_file {}", e),
745 retryable: false,
746 })?;
747 }
748 let mut file = File::create(file_path).await.map_err(|e| Error::Other {
750 message: format!("failed File::create {}", e),
751 retryable: false,
752 })?;
753
754 log::info!("writing byte stream to file {}", file_path);
755 while let Some(d) = output.body.try_next().await.map_err(|e| Error::Other {
756 message: format!("failed ByteStream::try_next {}", e),
757 retryable: false,
758 })? {
759 file.write_all(&d).await.map_err(|e| Error::API {
760 message: format!("failed File.write_all {}", e),
761 retryable: false,
762 })?;
763 }
764 file.flush().await.map_err(|e| Error::Other {
765 message: format!("failed File.flush {}", e),
766 retryable: false,
767 })?;
768
769 Ok(true)
770 }
771
772 pub async fn get_object_with_retries(
784 &self,
785 s3_bucket: &str,
786 s3_key: &str,
787 file_path: &str,
788 overwrite: bool,
789 timeout: Duration,
790 interval: Duration,
791 ) -> Result<bool> {
792 log::info!(
793 "get_object_with_retries '{s3_bucket}' '{s3_key}' exists with timeout {:?} and interval {:?}",
794 timeout,
795 interval,
796 );
797
798 let start = Instant::now();
799 let mut cnt: u128 = 0;
800 loop {
801 let elapsed = start.elapsed();
802 if elapsed.gt(&timeout) {
803 return Err(Error::API {
804 message: "get_object_with_retries not complete in time".to_string(),
805 retryable: true,
806 });
807 }
808
809 let itv = {
810 if cnt == 0 {
811 Duration::from_secs(1)
813 } else {
814 interval
815 }
816 };
817 sleep(itv).await;
818
819 match self
820 .get_object(s3_bucket, s3_key, file_path, overwrite)
821 .await
822 {
823 Ok(exists) => return Ok(exists),
824 Err(e) => {
825 if !e.retryable() {
826 return Err(e);
827 }
828 log::warn!("retriable s3 error '{}'", e);
829 }
830 }
831
832 cnt += 1;
833 }
834 }
835
836 pub async fn download_executable_with_retries(
839 &self,
840 s3_bucket: &str,
841 source_s3_path: &str,
842 target_file_path: &str,
843 overwrite: bool,
844 timeout: Duration,
845 interval: Duration,
846 ) -> Result<bool> {
847 log::info!("downloading '{source_s3_path}' in bucket '{s3_bucket}', region '{}' to executable '{target_file_path}' (overwrite {overwrite})", self.region);
848 let need_download = if Path::new(target_file_path).exists() {
849 if overwrite {
850 log::warn!(
851 "'{target_file_path}' already exists but overwrite true thus need download"
852 );
853 true
854 } else {
855 log::warn!(
856 "'{target_file_path}' already exists and overwrite false thus no need download"
857 );
858 false
859 }
860 } else {
861 log::warn!("'{target_file_path}' does not exist thus need download");
862 true
863 };
864
865 if !need_download {
866 log::info!("skipped download");
867 return Ok(true);
868 }
869
870 let tmp_path = random_manager::tmp_path(15, None).map_err(|e| Error::API {
871 message: format!("failed random_manager::tmp_path {}", e),
872 retryable: false,
873 })?;
874
875 let mut success = false;
876
877 let start = Instant::now();
878 let mut cnt: u128 = 0;
879 loop {
880 let elapsed = start.elapsed();
881 if elapsed.gt(&timeout) {
882 return Err(Error::API {
883 message: "get_object_with_retries not complete in time".to_string(),
884 retryable: true,
885 });
886 }
887
888 let itv = {
889 if cnt == 0 {
890 Duration::from_secs(1)
892 } else {
893 interval
894 }
895 };
896 sleep(itv).await;
897
898 log::info!("[ROUND {cnt}] get_object for '{source_s3_path}'");
899
900 match self
901 .get_object(s3_bucket, source_s3_path, &tmp_path, overwrite)
902 .await
903 {
904 Ok(exists) => {
905 if exists {
906 success = true;
907 break;
908 }
909 return Ok(exists);
910 }
911 Err(e) => {
912 if !e.retryable() {
913 return Err(e);
914 }
915 log::warn!("retriable s3 error '{}'", e);
916 }
917 }
918 if success {
919 break;
920 }
921
922 cnt += 1;
923 }
924 if !success {
925 return Err(Error::API {
926 message: "failed get_object after retries".to_string(),
927 retryable: false,
928 });
929 }
930
931 log::info!("successfully downloaded to a temporary file '{tmp_path}'");
932 {
933 let f = File::open(&tmp_path).await.map_err(|e| Error::API {
934 message: format!("failed File::open {}", e),
935 retryable: false,
936 })?;
937 f.set_permissions(PermissionsExt::from_mode(0o777))
938 .await
939 .map_err(|e| Error::API {
940 message: format!("failed File::set_permissions {}", e),
941 retryable: false,
942 })?;
943 }
944
945 log::info!("copying '{tmp_path}' to '{target_file_path}'");
946 match fs::copy(&tmp_path, &target_file_path).await {
947 Ok(_) => log::info!("successfully copied file"),
948 Err(e) => {
949 if !e.to_string().to_lowercase().contains("text file busy") {
952 return Err(Error::Other {
953 message: format!("failed fs::copy {}", e),
954 retryable: false,
955 });
956 }
957
958 log::warn!("failed copy due to file being used '{}'", e);
959 return Err(Error::Other {
960 message: format!("failed fs::copy {}", e),
961 retryable: true,
962 });
963 }
964 }
965
966 fs::remove_file(&tmp_path).await.map_err(|e| Error::API {
967 message: format!("failed fs::remove_file {}", e),
968 retryable: false,
969 })?;
970
971 Ok(true)
972 }
973}
974
975#[allow(dead_code)]
976async fn read_file_to_byte_stream(file_path: &str) -> Result<(f64, ByteStream)> {
977 let file = Path::new(file_path);
978 if !file.exists() {
979 return Err(Error::Other {
980 message: format!("file path '{file_path}' does not exist"),
981 retryable: false,
982 });
983 }
984
985 let meta = fs::metadata(file_path).await.map_err(|e| Error::Other {
986 message: format!("failed fs::metadata {}", e),
987 retryable: false,
988 })?;
989 let size = meta.len() as f64;
990
991 let byte_stream = ByteStream::from_path(file)
992 .await
993 .map_err(|e| Error::Other {
994 message: format!("failed ByteStream::from_file {}", e),
995 retryable: false,
996 })?;
997 Ok((size, byte_stream))
998}
999
1000fn read_file_to_bytes(file_path: &str) -> Result<Vec<u8>> {
1001 let file: &Path = Path::new(file_path);
1002 if !file.exists() {
1003 return Err(Error::Other {
1004 message: format!("file path '{file_path}' does not exist"),
1005 retryable: false,
1006 });
1007 }
1008
1009 std::fs::read(file_path)
1010 .map_err(|e| Error::Other {
1011 message: format!("failed fs::read {}", e),
1012 retryable: false,
1013 })
1014 .map_err(|e| Error::Other {
1015 message: format!("failed read file {}", e),
1016 retryable: false,
1017 })
1018}
1019
1020#[inline]
1021fn is_err_already_exists_create_bucket(
1022 e: &SdkError<CreateBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1023) -> bool {
1024 match e {
1025 SdkError::ServiceError(err) => {
1026 err.err().is_bucket_already_exists() || err.err().is_bucket_already_owned_by_you()
1027 }
1028 _ => false,
1029 }
1030}
1031
1032#[inline]
1033fn explain_err_delete_bucket(
1034 e: &SdkError<DeleteBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1035) -> String {
1036 match e {
1037 SdkError::ServiceError(err) => format!(
1038 "delete_bucket [code '{:?}', message '{:?}']",
1039 err.err().meta().code(),
1040 err.err().meta().message(),
1041 ),
1042 _ => e.to_string(),
1043 }
1044}
1045
1046#[inline]
1047fn is_err_does_not_exist_delete_bucket(
1048 e: &SdkError<DeleteBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1049) -> bool {
1050 match e {
1051 SdkError::ServiceError(err) => {
1052 let msg = format!(
1053 "delete_bucket [code '{:?}', message '{:?}']",
1054 err.err().meta().code(),
1055 err.err().meta().message(),
1056 );
1057 msg.contains("bucket does not exist") || msg.contains("NoSuchBucket")
1058 }
1059 _ => false,
1060 }
1061}
1062
1063#[inline]
1064fn is_err_head_bucket_not_found(
1065 e: &SdkError<HeadBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1066) -> bool {
1067 match e {
1068 SdkError::ServiceError(err) => err.err().is_not_found(),
1069 _ => false,
1070 }
1071}
1072
1073#[inline]
1074fn is_err_head_object_not_found(
1075 e: &SdkError<HeadObjectError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1076) -> bool {
1077 match e {
1078 SdkError::ServiceError(err) => err.err().is_not_found(),
1079 _ => false,
1080 }
1081}
1082
1083#[inline]
1085fn explain_err_head_object(
1086 e: &SdkError<HeadObjectError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1087) -> String {
1088 match e {
1089 SdkError::ServiceError(err) => format!(
1090 "head_object [code '{:?}', message '{:?}']",
1091 err.err().meta().code(),
1092 err.err().meta().message(),
1093 ),
1094 _ => e.to_string(),
1095 }
1096}
1097
1098#[inline]
1099fn explain_err_delete_objects(
1100 e: &SdkError<DeleteObjectsError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1101) -> String {
1102 match e {
1103 SdkError::ServiceError(err) => format!(
1104 "delete_objects [code '{:?}', message '{:?}']",
1105 err.err().meta().code(),
1106 err.err().meta().message(),
1107 ),
1108 _ => e.to_string(),
1109 }
1110}
1111
1112#[inline]
1113pub fn explain_err_put_bucket_lifecycle_configuration(
1114 e: &SdkError<
1115 PutBucketLifecycleConfigurationError,
1116 aws_smithy_runtime_api::client::orchestrator::HttpResponse,
1117 >,
1118) -> String {
1119 match e {
1120 SdkError::ServiceError(err) => format!(
1121 "put_bucket_lifecycle_configuration [code '{:?}', message '{:?}']",
1122 err.err().meta().code(),
1123 err.err().meta().message(),
1124 ),
1125 _ => e.to_string(),
1126 }
1127}
1128
1129#[test]
1130fn test_append_slash() {
1131 let s = "hello";
1132 assert_eq!(append_slash(s), "hello/");
1133
1134 let s = "hello/";
1135 assert_eq!(append_slash(s), "hello/");
1136}
1137
1138pub fn append_slash(k: &str) -> String {
1139 let n = k.len();
1140 if &k[n - 1..] == "/" {
1141 String::from(k)
1142 } else {
1143 format!("{}/", k)
1144 }
1145}