aws_manager/s3/
mod.rs

1use std::{
2    collections::HashMap,
3    {os::unix::fs::PermissionsExt, path::Path},
4};
5
6use crate::errors::{self, Error, Result};
7use aws_sdk_s3::{
8    operation::{
9        create_bucket::CreateBucketError,
10        delete_bucket::DeleteBucketError,
11        delete_objects::DeleteObjectsError,
12        head_bucket::HeadBucketError,
13        head_object::{HeadObjectError, HeadObjectOutput},
14        put_bucket_lifecycle_configuration::PutBucketLifecycleConfigurationError,
15    },
16    primitives::ByteStream,
17    types::{
18        BucketCannedAcl, BucketLifecycleConfiguration, BucketLocationConstraint,
19        CreateBucketConfiguration, Delete, ExpirationStatus, LifecycleExpiration, LifecycleRule,
20        LifecycleRuleFilter, Object, ObjectCannedAcl, ObjectIdentifier,
21        PublicAccessBlockConfiguration, ServerSideEncryption, ServerSideEncryptionByDefault,
22        ServerSideEncryptionConfiguration, ServerSideEncryptionRule,
23    },
24    Client,
25};
26use aws_smithy_client::SdkError;
27use aws_types::SdkConfig as AwsSdkConfig;
28use tokio::{
29    fs::{self, File},
30    io::AsyncWriteExt,
31    time::{sleep, Duration, Instant},
32};
33use tokio_stream::StreamExt;
34
35/// Implements AWS S3 manager.
36#[derive(Debug, Clone)]
37pub struct Manager {
38    pub region: String,
39    pub cli: Client,
40}
41
42impl Manager {
43    pub fn new(shared_config: &AwsSdkConfig) -> Self {
44        Self {
45            region: shared_config.region().unwrap().to_string(),
46            cli: Client::new(shared_config),
47        }
48    }
49
50    /// Creates a S3 bucket.
51    pub async fn create_bucket(&self, s3_bucket: &str) -> Result<()> {
52        log::info!("creating bucket '{s3_bucket}' in region {}", self.region);
53
54        let mut req = self
55            .cli
56            .create_bucket()
57            .bucket(s3_bucket)
58            .acl(BucketCannedAcl::Private);
59
60        // don't specify if "us-east-1", default is "us-east-1"
61        if self.region != "us-east-1" {
62            let constraint = BucketLocationConstraint::from(self.region.as_str());
63            let bucket_cfg = CreateBucketConfiguration::builder()
64                .location_constraint(constraint)
65                .build();
66            req = req.create_bucket_configuration(bucket_cfg);
67        }
68
69        let ret = req.send().await;
70        let already_created = match ret {
71            Ok(_) => false,
72            Err(e) => {
73                if !is_err_already_exists_create_bucket(&e) {
74                    return Err(Error::API {
75                        message: format!("failed create_bucket {:?}", e),
76                        retryable: errors::is_sdk_err_retryable(&e),
77                    });
78                }
79                log::warn!(
80                    "bucket already exists so returning early (original error '{}')",
81                    e
82                );
83                true
84            }
85        };
86        if already_created {
87            return Ok(());
88        }
89        log::info!("created bucket '{s3_bucket}'");
90
91        log::info!("setting bucket public_access_block configuration to private");
92        let public_access_block_cfg = PublicAccessBlockConfiguration::builder()
93            .block_public_acls(true)
94            .block_public_policy(true)
95            .ignore_public_acls(true)
96            .restrict_public_buckets(true)
97            .build();
98        self.cli
99            .put_public_access_block()
100            .bucket(s3_bucket)
101            .public_access_block_configuration(public_access_block_cfg)
102            .send()
103            .await
104            .map_err(|e| Error::API {
105                message: format!("failed put_public_access_block {}", e),
106                retryable: errors::is_sdk_err_retryable(&e),
107            })?;
108
109        let algo = ServerSideEncryption::Aes256;
110        let sse = ServerSideEncryptionByDefault::builder()
111            .set_sse_algorithm(Some(algo))
112            .build();
113        let server_side_encryption_rule = ServerSideEncryptionRule::builder()
114            .apply_server_side_encryption_by_default(sse)
115            .build();
116        let server_side_encryption_cfg = ServerSideEncryptionConfiguration::builder()
117            .rules(server_side_encryption_rule)
118            .build();
119        self.cli
120            .put_bucket_encryption()
121            .bucket(s3_bucket)
122            .server_side_encryption_configuration(server_side_encryption_cfg)
123            .send()
124            .await
125            .map_err(|e| Error::API {
126                message: format!("failed put_bucket_encryption {}", e),
127                retryable: errors::is_sdk_err_retryable(&e),
128            })?;
129
130        Ok(())
131    }
132
133    /// Put object expire configuration on the bucket.
134    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html>
135    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_LifecycleRule.html>
136    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_LifecycleExpiration.html>
137    pub async fn put_bucket_object_expire_configuration(
138        &self,
139        s3_bucket: &str,
140        days_to_prefixes: HashMap<i32, Vec<String>>,
141    ) -> Result<()> {
142        if days_to_prefixes.is_empty() {
143            return Err(Error::Other {
144                message: "empty prefixes".to_string(),
145                retryable: false,
146            });
147        }
148
149        log::info!(
150            "put bucket object expire configuration for '{s3_bucket}' with prefixes '{:?}' in region '{}'",
151            days_to_prefixes, self.region
152        );
153        let mut rules = Vec::new();
154        for (days, pfxs) in days_to_prefixes.iter() {
155            for pfx in pfxs {
156                rules.push(
157                    // ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_LifecycleRule.html>
158                    LifecycleRule::builder()
159                        // ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_LifecycleRuleFilter.html>
160                        .filter(LifecycleRuleFilter::Prefix(pfx.to_owned()))
161                        .expiration(LifecycleExpiration::builder().days(days.to_owned()).build())
162                        .status(ExpirationStatus::Enabled) // If 'Enabled', the rule is currently being applied.
163                        .build(),
164                );
165            }
166        }
167        let lifecycle = BucketLifecycleConfiguration::builder()
168            .set_rules(Some(rules))
169            .build();
170
171        // ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html>
172        let _ = self
173            .cli
174            .put_bucket_lifecycle_configuration()
175            .bucket(s3_bucket)
176            .lifecycle_configuration(lifecycle)
177            .send()
178            .await
179            .map_err(|e| Error::API {
180                message: format!(
181                    "failed put_bucket_lifecycle_configuration '{}'",
182                    explain_err_put_bucket_lifecycle_configuration(&e)
183                ),
184                retryable: errors::is_sdk_err_retryable(&e),
185            })?;
186
187        log::info!("successfullhy updated bucket lifecycle configuration");
188        Ok(())
189    }
190
191    /// Deletes a S3 bucket.
192    pub async fn delete_bucket(&self, s3_bucket: &str) -> Result<()> {
193        log::info!("deleting bucket '{s3_bucket}' in region '{}'", self.region);
194        match self.cli.delete_bucket().bucket(s3_bucket).send().await {
195            Ok(_) => {
196                log::info!("successfully deleted bucket '{s3_bucket}'");
197            }
198            Err(e) => {
199                if !is_err_does_not_exist_delete_bucket(&e) {
200                    return Err(Error::API {
201                        message: format!(
202                            "failed delete_bucket '{}'",
203                            explain_err_delete_bucket(&e)
204                        ),
205                        retryable: errors::is_sdk_err_retryable(&e),
206                    });
207                }
208                log::warn!(
209                    "bucket already deleted or does not exist '{}'",
210                    explain_err_delete_bucket(&e)
211                );
212            }
213        };
214
215        Ok(())
216    }
217
218    /// Returns true if the bucket exists.
219    pub async fn bucket_exists(&self, s3_bucket: &str) -> Result<bool> {
220        log::info!("checking whether bucket '{s3_bucket}' exists");
221
222        match self.cli.head_bucket().bucket(s3_bucket).send().await {
223            Ok(_) => {}
224            Err(e) => {
225                if is_err_head_bucket_not_found(&e) {
226                    return Ok(false);
227                }
228                return Err(Error::API {
229                    message: format!("failed head_bucket '{}'", e),
230                    retryable: errors::is_sdk_err_retryable(&e),
231                });
232            }
233        }
234
235        Ok(true)
236    }
237
238    /// Deletes objects by "prefix".
239    /// If "prefix" is "None", empties a S3 bucket, deleting all files.
240    /// ref. https://github.com/awslabs/aws-sdk-rust/blob/main/examples/s3/src/bin/delete-objects.rs
241    ///
242    /// "If a single piece of data must be accessible from more than one task
243    /// concurrently, then it must be shared using synchronization primitives such as Arc."
244    /// ref. https://tokio.rs/tokio/tutorial/spawning
245    pub async fn delete_objects(&self, s3_bucket: &str, prefix: Option<&str>) -> Result<()> {
246        log::info!(
247            "deleting objects in bucket '{s3_bucket}' in region '{}' (prefix {:?})",
248            self.region,
249            prefix,
250        );
251
252        if !self.bucket_exists(s3_bucket).await? {
253            return Err(Error::API {
254                message: format!("bucket '{s3_bucket}' not found",),
255                retryable: false,
256            });
257        }
258
259        let objects = self.list_objects(s3_bucket.clone(), prefix).await?;
260        let mut object_ids: Vec<ObjectIdentifier> = vec![];
261        for obj in objects {
262            let k = String::from(obj.key().unwrap_or(""));
263            let obj_id = ObjectIdentifier::builder().set_key(Some(k)).build();
264            object_ids.push(obj_id);
265        }
266
267        let n = object_ids.len();
268        if n > 0 {
269            let deletes = Delete::builder().set_objects(Some(object_ids)).build();
270            match self
271                .cli
272                .delete_objects()
273                .bucket(s3_bucket.to_string())
274                .delete(deletes)
275                .send()
276                .await
277            {
278                Ok(_) => {}
279                Err(e) => {
280                    return Err(Error::API {
281                        message: format!(
282                            "failed delete_objects '{}'",
283                            explain_err_delete_objects(&e)
284                        ),
285                        retryable: errors::is_sdk_err_retryable(&e),
286                    });
287                }
288            };
289            log::info!("deleted {} objets in bucket '{s3_bucket}'", n);
290        } else {
291            log::info!("nothing to delete; skipping...");
292        }
293
294        Ok(())
295    }
296
297    /// List objects in the bucket with an optional prefix,
298    /// in the descending order of "last_modified" timestamps.
299    /// "bucket_name" implies the suffix "/", so no need to prefix
300    /// sub-directory with "/".
301    /// Passing "bucket_name" + "directory" is enough!
302    ///
303    /// e.g.
304    /// "foo-mydatabucket" for bucket_name
305    /// "mydata/myprefix/" for prefix
306    pub async fn list_objects(&self, s3_bucket: &str, prefix: Option<&str>) -> Result<Vec<Object>> {
307        let pfx = {
308            if let Some(s) = prefix {
309                let s = s.to_string();
310                if s.is_empty() {
311                    None
312                } else {
313                    Some(s)
314                }
315            } else {
316                None
317            }
318        };
319
320        log::info!(
321            "listing bucket '{s3_bucket}' in region '{}' with prefix '{:?}'",
322            self.region,
323            pfx
324        );
325        let mut objects: Vec<Object> = Vec::new();
326        let mut token = String::new();
327        loop {
328            let mut builder = self.cli.list_objects_v2().bucket(s3_bucket.to_string());
329            if pfx.is_some() {
330                builder = builder.set_prefix(pfx.clone());
331            }
332            if !token.is_empty() {
333                builder = builder.set_continuation_token(Some(token.to_owned()));
334            }
335            let ret = match builder.send().await {
336                Ok(r) => r,
337                Err(e) => {
338                    return Err(Error::API {
339                        message: format!("failed list_objects_v2 {:?}", e),
340                        retryable: errors::is_sdk_err_retryable(&e),
341                    });
342                }
343            };
344            if ret.key_count == 0 {
345                break;
346            }
347            if ret.contents.is_none() {
348                break;
349            }
350            let contents = ret.contents.unwrap();
351            for obj in contents.iter() {
352                let k = obj.key().unwrap_or("");
353                if k.is_empty() {
354                    return Err(Error::API {
355                        message: String::from("empty key returned"),
356                        retryable: false,
357                    });
358                }
359                log::debug!("listing [{}]", k);
360                objects.push(obj.to_owned());
361            }
362
363            token = match ret.next_continuation_token {
364                Some(v) => v,
365                None => String::new(),
366            };
367            if token.is_empty() {
368                break;
369            }
370        }
371
372        if objects.len() > 1 {
373            log::info!(
374                "sorting {} objects in bucket {s3_bucket} with prefix {:?}",
375                objects.len(),
376                pfx
377            );
378            objects.sort_by(|a, b| {
379                let a_modified = a.last_modified.unwrap();
380                let a_modified = a_modified.as_nanos();
381
382                let b_modified = b.last_modified.unwrap();
383                let b_modified = b_modified.as_nanos();
384
385                // reverse comparison!
386                // older file placed in later in the array
387                // latest file first!
388                b_modified.cmp(&a_modified)
389            });
390        }
391        Ok(objects)
392    }
393
394    /// Writes an object to a S3 bucket using stream.
395    ///
396    /// WARN: use stream! otherwise it can cause OOM -- don't do the following!
397    ///       "fs::read" reads all data onto memory
398    ///       ".body(ByteStream::from(contents))" passes the whole data to an API call
399    ///
400    /// "If a single piece of data must be accessible from more than one task
401    /// concurrently, then it must be shared using synchronization primitives such as Arc."
402    /// ref. <https://tokio.rs/tokio/tutorial/spawning>
403    ///
404    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
405    pub async fn put_object(&self, file_path: &str, s3_bucket: &str, s3_key: &str) -> Result<()> {
406        self.put_object_with_metadata(file_path, s3_bucket, s3_key, None)
407            .await
408    }
409
410    pub async fn put_object_with_retries(
411        &self,
412        file_path: &str,
413        s3_bucket: &str,
414        s3_key: &str,
415        timeout: Duration,
416        interval: Duration,
417    ) -> Result<()> {
418        self.put_object_with_metadata_with_retries(
419            file_path, s3_bucket, s3_key, None, timeout, interval,
420        )
421        .await
422    }
423
424    /// Writes an object with the metadata.
425    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
426    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html>
427    pub async fn put_object_with_metadata(
428        &self,
429        file_path: &str,
430        s3_bucket: &str,
431        s3_key: &str,
432        metadata: Option<HashMap<String, String>>,
433    ) -> Result<()> {
434        // TODO: this fails for no reason...
435        // byte_stream.into_inner().bytes() returns None...
436        // use this when we need to stream a very large file
437        // let (size, byte_stream) = read_file_to_byte_stream(file_path).await?;
438        let b = read_file_to_bytes(file_path)?;
439        let size = b.len() as f64;
440
441        log::info!(
442            "put object '{file_path}' (size {}) to 's3://{}/{}' (region '{}')",
443            human_readable::bytes(size),
444            s3_bucket,
445            s3_key,
446            self.region,
447        );
448        self.put_bytes_with_metadata_with_retries(
449            b,
450            s3_bucket,
451            s3_key,
452            metadata,
453            Duration::from_secs(180),
454            Duration::from_secs(10),
455        )
456        .await
457    }
458
459    pub async fn put_object_with_metadata_with_retries(
460        &self,
461        file_path: &str,
462        s3_bucket: &str,
463        s3_key: &str,
464        metadata: Option<HashMap<String, String>>,
465        timeout: Duration,
466        interval: Duration,
467    ) -> Result<()> {
468        // TODO: this fails for no reason...
469        // byte_stream.into_inner().bytes() returns None...
470        // use this when we need to stream a very large file
471        // let (size, byte_stream) = read_file_to_byte_stream(file_path).await?;
472        let b = read_file_to_bytes(file_path)?;
473        let size = b.len() as f64;
474
475        log::info!(
476            "put object '{file_path}' (size {}) to 's3://{}/{}' (retries timeout '{:?}', region '{}')",
477            human_readable::bytes(size),
478            s3_bucket,
479            s3_key,
480            timeout,
481            self.region,
482        );
483
484        self.put_bytes_with_metadata_with_retries(b, s3_bucket, s3_key, metadata, timeout, interval)
485            .await
486    }
487
488    /// Writes a byte stream with the metadata.
489    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
490    /// ref. <https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html>
491    pub async fn put_byte_stream_with_metadata(
492        &self,
493        byte_stream: ByteStream,
494        s3_bucket: &str,
495        s3_key: &str,
496        metadata: Option<HashMap<String, String>>,
497    ) -> Result<()> {
498        log::info!(
499            "put_byte_stream_with_metadata to 's3://{}/{}' (region '{}')",
500            s3_bucket,
501            s3_key,
502            self.region
503        );
504
505        let mut req = self
506            .cli
507            .put_object()
508            .bucket(s3_bucket.to_string())
509            .key(s3_key.to_string())
510            .body(byte_stream)
511            .acl(ObjectCannedAcl::Private);
512        if let Some(md) = &metadata {
513            for (k, v) in md {
514                // "user-defined metadata names must begin with x-amz-meta- to distinguish them from other HTTP headers"
515                // ref. <https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html>
516                if !k.starts_with("x-amz-meta-") {
517                    return Err(Error::Other {
518                        message: format!(
519                            "user-defined metadata key '{}' is missing the prefix 'x-amz-meta-'",
520                            k
521                        ),
522                        retryable: false,
523                    });
524                }
525
526                // "user-defined metadata is limited to 2 KB in size"
527                // ref. <https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html>
528                if v.len() > 2048 {
529                    return Err(Error::Other {
530                        message: format!(
531                            "user-defined metadata value is {}-byte, exceeds 2 KiB limit",
532                            v.len()
533                        ),
534                        retryable: false,
535                    });
536                }
537
538                req = req.metadata(k, v);
539            }
540        }
541
542        req.send().await.map_err(|e| Error::API {
543            message: format!("failed put_object '{}'", e),
544            retryable: errors::is_sdk_err_retryable(&e),
545        })?;
546
547        Ok(())
548    }
549
550    pub async fn put_bytes_with_metadata_with_retries(
551        &self,
552        b: Vec<u8>,
553        s3_bucket: &str,
554        s3_key: &str,
555        metadata: Option<HashMap<String, String>>,
556        timeout: Duration,
557        interval: Duration,
558    ) -> Result<()> {
559        log::info!(
560            "put_bytes_with_metadata_with_retries '{s3_bucket}' '{s3_key}' in region '{}' with retries timeout {:?} and interval {:?}",
561            self.region,
562            timeout,
563            interval,
564        );
565
566        let start = Instant::now();
567        let mut cnt: u128 = 0;
568        loop {
569            let elapsed = start.elapsed();
570            if elapsed.gt(&timeout) {
571                return Err(Error::API {
572                    message: "put_byte_with_metadata_with_retries not complete in time".to_string(),
573                    retryable: true,
574                });
575            }
576
577            let itv = {
578                if cnt == 0 {
579                    // first poll with no wait
580                    Duration::from_secs(1)
581                } else {
582                    interval
583                }
584            };
585            sleep(itv).await;
586
587            match self
588                .put_byte_stream_with_metadata(
589                    ByteStream::from(b.clone()),
590                    s3_bucket,
591                    s3_key,
592                    metadata.clone(),
593                )
594                .await
595            {
596                Ok(_) => return Ok(()),
597                Err(e) => {
598                    if !e.retryable() {
599                        return Err(e);
600                    }
601                }
602            }
603
604            cnt += 1;
605        }
606    }
607
608    /// Returns "None" if the S3 file does not exist.
609    pub async fn exists(&self, s3_bucket: &str, s3_key: &str) -> Result<Option<HeadObjectOutput>> {
610        let head_output = match self
611            .cli
612            .head_object()
613            .bucket(s3_bucket.to_string())
614            .key(s3_key.to_string())
615            .send()
616            .await
617        {
618            Ok(out) => out,
619            Err(e) => {
620                if is_err_head_object_not_found(&e) {
621                    log::info!("{s3_key} not found");
622                    return Ok(None);
623                }
624
625                log::warn!("failed to head {s3_key}: {}", explain_err_head_object(&e));
626                return Err(Error::API {
627                    message: format!("failed head_object {}", e),
628                    retryable: errors::is_sdk_err_retryable(&e),
629                });
630            }
631        };
632
633        log::info!(
634            "head object exists 's3://{}/{}' (content type '{}', size {})",
635            s3_bucket,
636            s3_key,
637            head_output.content_type().unwrap(),
638            human_readable::bytes(head_output.content_length() as f64),
639        );
640        Ok(Some(head_output))
641    }
642
643    pub async fn exists_with_retries(
644        &self,
645        s3_bucket: &str,
646        s3_key: &str,
647        timeout: Duration,
648        interval: Duration,
649    ) -> Result<Option<HeadObjectOutput>> {
650        log::info!(
651            "exists_with_retries '{s3_bucket}' '{s3_key}' exists with timeout {:?} and interval {:?}",
652            timeout,
653            interval,
654        );
655
656        let start = Instant::now();
657        let mut cnt: u128 = 0;
658        loop {
659            let elapsed = start.elapsed();
660            if elapsed.gt(&timeout) {
661                return Err(Error::API {
662                    message: "exists_with_retries not complete in time".to_string(),
663                    retryable: true,
664                });
665            }
666
667            let itv = {
668                if cnt == 0 {
669                    // first poll with no wait
670                    Duration::from_secs(1)
671                } else {
672                    interval
673                }
674            };
675            sleep(itv).await;
676
677            match self.exists(s3_bucket, s3_key).await {
678                Ok(head) => return Ok(head),
679                Err(e) => {
680                    if !e.retryable() {
681                        return Err(e);
682                    }
683                    log::warn!("retriable s3 error '{}'", e);
684                }
685            }
686
687            cnt += 1;
688        }
689    }
690
691    /// Downloads an object from a S3 bucket using stream.
692    ///
693    /// WARN: use stream! otherwise it can cause OOM -- don't do the following!
694    ///       "aws_smithy_http::byte_stream:ByteStream.collect" reads all the data into memory
695    ///       "File.write_all_buf(&mut bytes)" to write bytes
696    ///
697    /// "If a single piece of data must be accessible from more than one task
698    /// concurrently, then it must be shared using synchronization primitives such as Arc."
699    /// ref. https://tokio.rs/tokio/tutorial/spawning
700    ///
701    /// Returns "true" if the file got downloaded successfully.
702    /// Returns "false" if the S3 file does not exist.
703    pub async fn get_object(
704        &self,
705        s3_bucket: &str,
706        s3_key: &str,
707        file_path: &str,
708        overwrite: bool,
709    ) -> Result<bool> {
710        let need_delete = if Path::new(file_path).exists() {
711            if !overwrite {
712                return Err(Error::Other {
713                    message: format!("file path '{file_path}' already exists and overwrite=false"),
714                    retryable: false,
715                });
716            }
717            true
718        } else {
719            false
720        };
721
722        log::info!("checking if the s3 object '{s3_key}' exists before downloading");
723        let head_object = self.exists(s3_bucket, s3_key).await?;
724        if head_object.is_none() {
725            log::warn!("s3 file '{s3_key}' does not exist in the bucket {s3_bucket}");
726            return Ok(false);
727        }
728
729        let mut output = self
730            .cli
731            .get_object()
732            .bucket(s3_bucket.to_string())
733            .key(s3_key.to_string())
734            .send()
735            .await
736            .map_err(|e| Error::API {
737                message: format!("failed get_object {}", e),
738                retryable: errors::is_sdk_err_retryable(&e),
739            })?;
740
741        if need_delete {
742            log::info!("removing file before creating a new one");
743            fs::remove_file(file_path).await.map_err(|e| Error::Other {
744                message: format!("failed fs::remove_file {}", e),
745                retryable: false,
746            })?;
747        }
748        // ref. <https://docs.rs/tokio-stream/latest/tokio_stream/>
749        let mut file = File::create(file_path).await.map_err(|e| Error::Other {
750            message: format!("failed File::create {}", e),
751            retryable: false,
752        })?;
753
754        log::info!("writing byte stream to file {}", file_path);
755        while let Some(d) = output.body.try_next().await.map_err(|e| Error::Other {
756            message: format!("failed ByteStream::try_next {}", e),
757            retryable: false,
758        })? {
759            file.write_all(&d).await.map_err(|e| Error::API {
760                message: format!("failed File.write_all {}", e),
761                retryable: false,
762            })?;
763        }
764        file.flush().await.map_err(|e| Error::Other {
765            message: format!("failed File.flush {}", e),
766            retryable: false,
767        })?;
768
769        Ok(true)
770    }
771
772    /// Downloads an object from a S3 bucket using stream.
773    ///
774    /// WARN: use stream! otherwise it can cause OOM -- don't do the following!
775    ///       "aws_smithy_http::byte_stream:ByteStream.collect" reads all the data into memory
776    ///       "File.write_all_buf(&mut bytes)" to write bytes
777    ///
778    /// "If a single piece of data must be accessible from more than one task
779    /// concurrently, then it must be shared using synchronization primitives such as Arc."
780    /// ref. https://tokio.rs/tokio/tutorial/spawning
781    ///
782    /// Returns "true" if the file exists and got downloaded successfully.
783    pub async fn get_object_with_retries(
784        &self,
785        s3_bucket: &str,
786        s3_key: &str,
787        file_path: &str,
788        overwrite: bool,
789        timeout: Duration,
790        interval: Duration,
791    ) -> Result<bool> {
792        log::info!(
793            "get_object_with_retries '{s3_bucket}' '{s3_key}' exists with timeout {:?} and interval {:?}",
794            timeout,
795            interval,
796        );
797
798        let start = Instant::now();
799        let mut cnt: u128 = 0;
800        loop {
801            let elapsed = start.elapsed();
802            if elapsed.gt(&timeout) {
803                return Err(Error::API {
804                    message: "get_object_with_retries not complete in time".to_string(),
805                    retryable: true,
806                });
807            }
808
809            let itv = {
810                if cnt == 0 {
811                    // first poll with no wait
812                    Duration::from_secs(1)
813                } else {
814                    interval
815                }
816            };
817            sleep(itv).await;
818
819            match self
820                .get_object(s3_bucket, s3_key, file_path, overwrite)
821                .await
822            {
823                Ok(exists) => return Ok(exists),
824                Err(e) => {
825                    if !e.retryable() {
826                        return Err(e);
827                    }
828                    log::warn!("retriable s3 error '{}'", e);
829                }
830            }
831
832            cnt += 1;
833        }
834    }
835
836    /// Returns "true" if successfully downloaded, or skipped to not overwrite.
837    /// Returns "false" if not exists.
838    pub async fn download_executable_with_retries(
839        &self,
840        s3_bucket: &str,
841        source_s3_path: &str,
842        target_file_path: &str,
843        overwrite: bool,
844        timeout: Duration,
845        interval: Duration,
846    ) -> Result<bool> {
847        log::info!("downloading '{source_s3_path}' in bucket '{s3_bucket}', region '{}' to executable '{target_file_path}' (overwrite {overwrite})", self.region);
848        let need_download = if Path::new(target_file_path).exists() {
849            if overwrite {
850                log::warn!(
851                    "'{target_file_path}' already exists but overwrite true thus need download"
852                );
853                true
854            } else {
855                log::warn!(
856                    "'{target_file_path}' already exists and overwrite false thus no need download"
857                );
858                false
859            }
860        } else {
861            log::warn!("'{target_file_path}' does not exist thus need download");
862            true
863        };
864
865        if !need_download {
866            log::info!("skipped download");
867            return Ok(true);
868        }
869
870        let tmp_path = random_manager::tmp_path(15, None).map_err(|e| Error::API {
871            message: format!("failed random_manager::tmp_path {}", e),
872            retryable: false,
873        })?;
874
875        let mut success = false;
876
877        let start = Instant::now();
878        let mut cnt: u128 = 0;
879        loop {
880            let elapsed = start.elapsed();
881            if elapsed.gt(&timeout) {
882                return Err(Error::API {
883                    message: "get_object_with_retries not complete in time".to_string(),
884                    retryable: true,
885                });
886            }
887
888            let itv = {
889                if cnt == 0 {
890                    // first poll with no wait
891                    Duration::from_secs(1)
892                } else {
893                    interval
894                }
895            };
896            sleep(itv).await;
897
898            log::info!("[ROUND {cnt}] get_object for '{source_s3_path}'");
899
900            match self
901                .get_object(s3_bucket, source_s3_path, &tmp_path, overwrite)
902                .await
903            {
904                Ok(exists) => {
905                    if exists {
906                        success = true;
907                        break;
908                    }
909                    return Ok(exists);
910                }
911                Err(e) => {
912                    if !e.retryable() {
913                        return Err(e);
914                    }
915                    log::warn!("retriable s3 error '{}'", e);
916                }
917            }
918            if success {
919                break;
920            }
921
922            cnt += 1;
923        }
924        if !success {
925            return Err(Error::API {
926                message: "failed get_object after retries".to_string(),
927                retryable: false,
928            });
929        }
930
931        log::info!("successfully downloaded to a temporary file '{tmp_path}'");
932        {
933            let f = File::open(&tmp_path).await.map_err(|e| Error::API {
934                message: format!("failed File::open {}", e),
935                retryable: false,
936            })?;
937            f.set_permissions(PermissionsExt::from_mode(0o777))
938                .await
939                .map_err(|e| Error::API {
940                    message: format!("failed File::set_permissions {}", e),
941                    retryable: false,
942                })?;
943        }
944
945        log::info!("copying '{tmp_path}' to '{target_file_path}'");
946        match fs::copy(&tmp_path, &target_file_path).await {
947            Ok(_) => log::info!("successfully copied file"),
948            Err(e) => {
949                // mask the error
950                // Os { code: 26, kind: ExecutableFileBusy, message: "Text file busy" }
951                if !e.to_string().to_lowercase().contains("text file busy") {
952                    return Err(Error::Other {
953                        message: format!("failed fs::copy {}", e),
954                        retryable: false,
955                    });
956                }
957
958                log::warn!("failed copy due to file being used '{}'", e);
959                return Err(Error::Other {
960                    message: format!("failed fs::copy {}", e),
961                    retryable: true,
962                });
963            }
964        }
965
966        fs::remove_file(&tmp_path).await.map_err(|e| Error::API {
967            message: format!("failed fs::remove_file {}", e),
968            retryable: false,
969        })?;
970
971        Ok(true)
972    }
973}
974
975#[allow(dead_code)]
976async fn read_file_to_byte_stream(file_path: &str) -> Result<(f64, ByteStream)> {
977    let file = Path::new(file_path);
978    if !file.exists() {
979        return Err(Error::Other {
980            message: format!("file path '{file_path}' does not exist"),
981            retryable: false,
982        });
983    }
984
985    let meta = fs::metadata(file_path).await.map_err(|e| Error::Other {
986        message: format!("failed fs::metadata {}", e),
987        retryable: false,
988    })?;
989    let size = meta.len() as f64;
990
991    let byte_stream = ByteStream::from_path(file)
992        .await
993        .map_err(|e| Error::Other {
994            message: format!("failed ByteStream::from_file {}", e),
995            retryable: false,
996        })?;
997    Ok((size, byte_stream))
998}
999
1000fn read_file_to_bytes(file_path: &str) -> Result<Vec<u8>> {
1001    let file: &Path = Path::new(file_path);
1002    if !file.exists() {
1003        return Err(Error::Other {
1004            message: format!("file path '{file_path}' does not exist"),
1005            retryable: false,
1006        });
1007    }
1008
1009    std::fs::read(file_path)
1010        .map_err(|e| Error::Other {
1011            message: format!("failed fs::read {}", e),
1012            retryable: false,
1013        })
1014        .map_err(|e| Error::Other {
1015            message: format!("failed read file {}", e),
1016            retryable: false,
1017        })
1018}
1019
1020#[inline]
1021fn is_err_already_exists_create_bucket(
1022    e: &SdkError<CreateBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1023) -> bool {
1024    match e {
1025        SdkError::ServiceError(err) => {
1026            err.err().is_bucket_already_exists() || err.err().is_bucket_already_owned_by_you()
1027        }
1028        _ => false,
1029    }
1030}
1031
1032#[inline]
1033fn explain_err_delete_bucket(
1034    e: &SdkError<DeleteBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1035) -> String {
1036    match e {
1037        SdkError::ServiceError(err) => format!(
1038            "delete_bucket [code '{:?}', message '{:?}']",
1039            err.err().meta().code(),
1040            err.err().meta().message(),
1041        ),
1042        _ => e.to_string(),
1043    }
1044}
1045
1046#[inline]
1047fn is_err_does_not_exist_delete_bucket(
1048    e: &SdkError<DeleteBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1049) -> bool {
1050    match e {
1051        SdkError::ServiceError(err) => {
1052            let msg = format!(
1053                "delete_bucket [code '{:?}', message '{:?}']",
1054                err.err().meta().code(),
1055                err.err().meta().message(),
1056            );
1057            msg.contains("bucket does not exist") || msg.contains("NoSuchBucket")
1058        }
1059        _ => false,
1060    }
1061}
1062
1063#[inline]
1064fn is_err_head_bucket_not_found(
1065    e: &SdkError<HeadBucketError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1066) -> bool {
1067    match e {
1068        SdkError::ServiceError(err) => err.err().is_not_found(),
1069        _ => false,
1070    }
1071}
1072
1073#[inline]
1074fn is_err_head_object_not_found(
1075    e: &SdkError<HeadObjectError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1076) -> bool {
1077    match e {
1078        SdkError::ServiceError(err) => err.err().is_not_found(),
1079        _ => false,
1080    }
1081}
1082
1083/// TODO: handle "code" and "message" None if the object does not exist
1084#[inline]
1085fn explain_err_head_object(
1086    e: &SdkError<HeadObjectError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1087) -> String {
1088    match e {
1089        SdkError::ServiceError(err) => format!(
1090            "head_object [code '{:?}', message '{:?}']",
1091            err.err().meta().code(),
1092            err.err().meta().message(),
1093        ),
1094        _ => e.to_string(),
1095    }
1096}
1097
1098#[inline]
1099fn explain_err_delete_objects(
1100    e: &SdkError<DeleteObjectsError, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
1101) -> String {
1102    match e {
1103        SdkError::ServiceError(err) => format!(
1104            "delete_objects [code '{:?}', message '{:?}']",
1105            err.err().meta().code(),
1106            err.err().meta().message(),
1107        ),
1108        _ => e.to_string(),
1109    }
1110}
1111
1112#[inline]
1113pub fn explain_err_put_bucket_lifecycle_configuration(
1114    e: &SdkError<
1115        PutBucketLifecycleConfigurationError,
1116        aws_smithy_runtime_api::client::orchestrator::HttpResponse,
1117    >,
1118) -> String {
1119    match e {
1120        SdkError::ServiceError(err) => format!(
1121            "put_bucket_lifecycle_configuration [code '{:?}', message '{:?}']",
1122            err.err().meta().code(),
1123            err.err().meta().message(),
1124        ),
1125        _ => e.to_string(),
1126    }
1127}
1128
1129#[test]
1130fn test_append_slash() {
1131    let s = "hello";
1132    assert_eq!(append_slash(s), "hello/");
1133
1134    let s = "hello/";
1135    assert_eq!(append_slash(s), "hello/");
1136}
1137
1138pub fn append_slash(k: &str) -> String {
1139    let n = k.len();
1140    if &k[n - 1..] == "/" {
1141        String::from(k)
1142    } else {
1143        format!("{}/", k)
1144    }
1145}