aws_sdk_manager/
s3.rs

1use std::{fs, path::Path, sync::Arc};
2
3use crate::{
4    errors::{
5        Error::{Other, API},
6        Result,
7    },
8    kms::envelope::Envelope,
9};
10use aws_sdk_s3::{
11    error::{CreateBucketError, CreateBucketErrorKind, DeleteBucketError},
12    model::{
13        BucketCannedAcl, BucketLocationConstraint, CreateBucketConfiguration, Delete, Object,
14        ObjectCannedAcl, ObjectIdentifier, PublicAccessBlockConfiguration, ServerSideEncryption,
15        ServerSideEncryptionByDefault, ServerSideEncryptionConfiguration, ServerSideEncryptionRule,
16    },
17    types::{ByteStream, SdkError},
18    Client,
19};
20use aws_types::SdkConfig as AwsSdkConfig;
21use log::{debug, info, warn};
22use tokio::{fs::File, io::AsyncWriteExt};
23use tokio_stream::StreamExt;
24
25/// Implements AWS S3 manager.
26#[derive(Debug, Clone)]
27pub struct Manager {
28    #[allow(dead_code)]
29    shared_config: AwsSdkConfig,
30    cli: Client,
31}
32
33impl Manager {
34    pub fn new(shared_config: &AwsSdkConfig) -> Self {
35        let cloned = shared_config.clone();
36        let cli = Client::new(shared_config);
37        Self {
38            shared_config: cloned,
39            cli,
40        }
41    }
42
43    /// Creates a S3 bucket.
44    pub async fn create_bucket(&self, s3_bucket: &str) -> Result<()> {
45        let reg = self.shared_config.region().unwrap();
46        let constraint = BucketLocationConstraint::from(reg.to_string().as_str());
47        let bucket_cfg = CreateBucketConfiguration::builder()
48            .location_constraint(constraint)
49            .build();
50
51        info!(
52            "creating S3 bucket '{}' in region {}",
53            s3_bucket,
54            reg.to_string()
55        );
56        let ret = self
57            .cli
58            .create_bucket()
59            .create_bucket_configuration(bucket_cfg)
60            .bucket(s3_bucket)
61            .acl(BucketCannedAcl::Private)
62            .send()
63            .await;
64        let already_created = match ret {
65            Ok(_) => false,
66            Err(e) => {
67                if !is_error_bucket_already_exist(&e) {
68                    return Err(API {
69                        message: format!("failed create_bucket {:?}", e),
70                        is_retryable: is_error_retryable(&e),
71                    });
72                }
73                warn!("bucket already exists ({})", e);
74                true
75            }
76        };
77        if already_created {
78            return Ok(());
79        }
80        info!("created S3 bucket '{}'", s3_bucket);
81
82        info!("setting S3 bucket public_access_block configuration to private");
83        let public_access_block_cfg = PublicAccessBlockConfiguration::builder()
84            .block_public_acls(true)
85            .block_public_policy(true)
86            .ignore_public_acls(true)
87            .restrict_public_buckets(true)
88            .build();
89        self.cli
90            .put_public_access_block()
91            .bucket(s3_bucket)
92            .public_access_block_configuration(public_access_block_cfg)
93            .send()
94            .await
95            .map_err(|e| API {
96                message: format!("failed put_public_access_block {}", e),
97                is_retryable: is_error_retryable(&e),
98            })?;
99
100        let algo = ServerSideEncryption::Aes256;
101        let sse = ServerSideEncryptionByDefault::builder()
102            .set_sse_algorithm(Some(algo))
103            .build();
104        let server_side_encryption_rule = ServerSideEncryptionRule::builder()
105            .apply_server_side_encryption_by_default(sse)
106            .build();
107        let server_side_encryption_cfg = ServerSideEncryptionConfiguration::builder()
108            .rules(server_side_encryption_rule)
109            .build();
110        self.cli
111            .put_bucket_encryption()
112            .bucket(s3_bucket)
113            .server_side_encryption_configuration(server_side_encryption_cfg)
114            .send()
115            .await
116            .map_err(|e| API {
117                message: format!("failed put_bucket_encryption {}", e),
118                is_retryable: is_error_retryable(&e),
119            })?;
120
121        Ok(())
122    }
123
124    /// Deletes a S3 bucket.
125    pub async fn delete_bucket(&self, s3_bucket: &str) -> Result<()> {
126        let reg = self.shared_config.region().unwrap();
127        info!(
128            "deleting S3 bucket '{}' in region {}",
129            s3_bucket,
130            reg.to_string()
131        );
132        let ret = self.cli.delete_bucket().bucket(s3_bucket).send().await;
133        match ret {
134            Ok(_) => {}
135            Err(e) => {
136                if !is_error_bucket_does_not_exist(&e) {
137                    return Err(API {
138                        message: format!("failed delete_bucket {:?}", e),
139                        is_retryable: is_error_retryable(&e),
140                    });
141                }
142                warn!("bucket already deleted or does not exist ({})", e);
143            }
144        };
145        info!("deleted S3 bucket '{}'", s3_bucket);
146
147        Ok(())
148    }
149
150    /// Deletes objects by "prefix".
151    /// If "prefix" is "None", empties a S3 bucket, deleting all files.
152    /// ref. https://github.com/awslabs/aws-sdk-rust/blob/main/examples/s3/src/bin/delete-objects.rs
153    ///
154    /// "If a single piece of data must be accessible from more than one task
155    /// concurrently, then it must be shared using synchronization primitives such as Arc."
156    /// ref. https://tokio.rs/tokio/tutorial/spawning
157    pub async fn delete_objects(
158        &self,
159        s3_bucket: Arc<String>,
160        prefix: Option<Arc<String>>,
161    ) -> Result<()> {
162        let reg = self.shared_config.region().unwrap();
163        info!(
164            "deleting objects S3 bucket '{}' in region {} (prefix {:?})",
165            s3_bucket,
166            reg.to_string(),
167            prefix,
168        );
169
170        let objects = self.list_objects(s3_bucket.clone(), prefix).await?;
171        let mut object_ids: Vec<ObjectIdentifier> = vec![];
172        for obj in objects {
173            let k = String::from(obj.key().unwrap_or(""));
174            let obj_id = ObjectIdentifier::builder().set_key(Some(k)).build();
175            object_ids.push(obj_id);
176        }
177
178        let n = object_ids.len();
179        if n > 0 {
180            let deletes = Delete::builder().set_objects(Some(object_ids)).build();
181            let ret = self
182                .cli
183                .delete_objects()
184                .bucket(s3_bucket.to_string())
185                .delete(deletes)
186                .send()
187                .await;
188            match ret {
189                Ok(_) => {}
190                Err(e) => {
191                    return Err(API {
192                        message: format!("failed delete_bucket {:?}", e),
193                        is_retryable: is_error_retryable(&e),
194                    });
195                }
196            };
197            info!("deleted {} objets in S3 bucket '{}'", n, s3_bucket);
198        } else {
199            info!("nothing to delete; skipping...");
200        }
201
202        Ok(())
203    }
204
205    /// List objects in the bucket with an optional prefix,
206    /// in the descending order of "last_modified" timestamps.
207    /// "bucket_name" implies the suffix "/", so no need to prefix
208    /// sub-directory with "/".
209    /// Passing "bucket_name" + "directory" is enough!
210    ///
211    /// e.g.
212    /// "foo-mydatabucket" for bucket_name
213    /// "mydata/myprefix/" for prefix
214    pub async fn list_objects(
215        &self,
216        s3_bucket: Arc<String>,
217        prefix: Option<Arc<String>>,
218    ) -> Result<Vec<Object>> {
219        let pfx = {
220            if let Some(s) = prefix {
221                let s = s.to_string();
222                if s.is_empty() {
223                    None
224                } else {
225                    Some(s)
226                }
227            } else {
228                None
229            }
230        };
231
232        info!("listing bucket {} with prefix '{:?}'", s3_bucket, pfx);
233        let mut objects: Vec<Object> = Vec::new();
234        let mut token = String::new();
235        loop {
236            let mut builder = self.cli.list_objects_v2().bucket(s3_bucket.to_string());
237            if pfx.is_some() {
238                builder = builder.set_prefix(pfx.clone());
239            }
240            if !token.is_empty() {
241                builder = builder.set_continuation_token(Some(token.to_owned()));
242            }
243            let ret = match builder.send().await {
244                Ok(r) => r,
245                Err(e) => {
246                    return Err(API {
247                        message: format!("failed list_objects_v2 {:?}", e),
248                        is_retryable: is_error_retryable(&e),
249                    });
250                }
251            };
252            if ret.key_count == 0 {
253                break;
254            }
255            if ret.contents.is_none() {
256                break;
257            }
258            let contents = ret.contents.unwrap();
259            for obj in contents.iter() {
260                let k = obj.key().unwrap_or("");
261                if k.is_empty() {
262                    return Err(API {
263                        message: String::from("empty key returned"),
264                        is_retryable: false,
265                    });
266                }
267                debug!("listing [{}]", k);
268                objects.push(obj.to_owned());
269            }
270
271            token = match ret.next_continuation_token {
272                Some(v) => v,
273                None => String::new(),
274            };
275            if token.is_empty() {
276                break;
277            }
278        }
279
280        if objects.len() > 1 {
281            info!(
282                "sorting {} objects in bucket {} with prefix {:?}",
283                objects.len(),
284                s3_bucket,
285                pfx
286            );
287            objects.sort_by(|a, b| {
288                let a_modified = a.last_modified.unwrap();
289                let a_modified = a_modified.as_nanos();
290
291                let b_modified = b.last_modified.unwrap();
292                let b_modified = b_modified.as_nanos();
293
294                // reverse comparison!
295                // older file placed in later in the array
296                // latest file first!
297                b_modified.cmp(&a_modified)
298            });
299        }
300        Ok(objects)
301    }
302
303    /// Writes an object to a S3 bucket using stream.
304    ///
305    /// WARN: use stream! otherwise it can cause OOM -- don't do the following!
306    ///       "fs::read" reads all data onto memory
307    ///       ".body(ByteStream::from(contents))" passes the whole data to an API call
308    ///
309    /// "If a single piece of data must be accessible from more than one task
310    /// concurrently, then it must be shared using synchronization primitives such as Arc."
311    /// ref. https://tokio.rs/tokio/tutorial/spawning
312    pub async fn put_object(
313        &self,
314        file_path: Arc<String>,
315        s3_bucket: Arc<String>,
316        s3_key: Arc<String>,
317    ) -> Result<()> {
318        if !Path::new(&file_path.to_string()).exists() {
319            return Err(Other {
320                message: format!("file path {} does not exist", file_path),
321                is_retryable: false,
322            });
323        }
324
325        let meta = fs::metadata(file_path.as_str()).map_err(|e| Other {
326            message: format!("failed metadata {}", e),
327            is_retryable: false,
328        })?;
329        let size = meta.len() as f64;
330        info!(
331            "starting put_object '{}' (size {}) to 's3://{}/{}'",
332            file_path,
333            human_readable::bytes(size),
334            s3_bucket,
335            s3_key
336        );
337
338        let byte_stream = ByteStream::from_path(Path::new(file_path.as_str()))
339            .await
340            .map_err(|e| Other {
341                message: format!("failed ByteStream::from_file {}", e),
342                is_retryable: false,
343            })?;
344        self.cli
345            .put_object()
346            .bucket(s3_bucket.to_string())
347            .key(s3_key.to_string())
348            .body(byte_stream)
349            .acl(ObjectCannedAcl::Private)
350            .send()
351            .await
352            .map_err(|e| API {
353                message: format!("failed put_object {}", e),
354                is_retryable: is_error_retryable(&e),
355            })?;
356
357        Ok(())
358    }
359
360    /// Downloads an object from a S3 bucket using stream.
361    ///
362    /// WARN: use stream! otherwise it can cause OOM -- don't do the following!
363    ///       "aws_smithy_http::byte_stream:ByteStream.collect" reads all the data into memory
364    ///       "File.write_all_buf(&mut bytes)" to write bytes
365    ///
366    /// "If a single piece of data must be accessible from more than one task
367    /// concurrently, then it must be shared using synchronization primitives such as Arc."
368    /// ref. https://tokio.rs/tokio/tutorial/spawning
369    pub async fn get_object(
370        &self,
371        s3_bucket: Arc<String>,
372        s3_key: Arc<String>,
373        file_path: Arc<String>,
374    ) -> Result<()> {
375        if Path::new(file_path.as_str()).exists() {
376            return Err(Other {
377                message: format!("file path {} already exists", file_path),
378                is_retryable: false,
379            });
380        }
381
382        let head_output = self
383            .cli
384            .head_object()
385            .bucket(s3_bucket.to_string())
386            .key(s3_key.to_string())
387            .send()
388            .await
389            .map_err(|e| API {
390                message: format!("failed head_object {}", e),
391                is_retryable: is_error_retryable(&e),
392            })?;
393
394        info!(
395            "starting get_object 's3://{}/{}' (content type '{}', size {})",
396            s3_bucket,
397            s3_key,
398            head_output.content_type().unwrap(),
399            human_readable::bytes(head_output.content_length() as f64),
400        );
401        let mut output = self
402            .cli
403            .get_object()
404            .bucket(s3_bucket.to_string())
405            .key(s3_key.to_string())
406            .send()
407            .await
408            .map_err(|e| API {
409                message: format!("failed get_object {}", e),
410                is_retryable: is_error_retryable(&e),
411            })?;
412
413        // ref. https://docs.rs/tokio-stream/latest/tokio_stream/
414        let mut file = File::create(file_path.as_str()).await.map_err(|e| Other {
415            message: format!("failed File::create {}", e),
416            is_retryable: false,
417        })?;
418
419        info!("writing byte stream to file {}", file_path);
420        while let Some(d) = output.body.try_next().await.map_err(|e| Other {
421            message: format!("failed ByteStream::try_next {}", e),
422            is_retryable: false,
423        })? {
424            file.write_all(&d).await.map_err(|e| API {
425                message: format!("failed File.write_all {}", e),
426                is_retryable: false,
427            })?;
428        }
429        file.flush().await.map_err(|e| Other {
430            message: format!("failed File.flush {}", e),
431            is_retryable: false,
432        })?;
433
434        Ok(())
435    }
436
437    /// Compresses the file, encrypts, and uploads to S3.
438    pub async fn compress_seal_put_object(
439        &self,
440        envelope: Arc<Envelope>,
441        file_path: Arc<String>,
442        s3_bucket: Arc<String>,
443        s3_key: Arc<String>,
444    ) -> Result<()> {
445        info!(
446            "compress-seal-put-object: compress and seal '{}'",
447            file_path.as_str()
448        );
449        let compressed_sealed_path = random_manager::tmp_path(10, None).unwrap();
450        envelope
451            .compress_seal(file_path.clone(), Arc::new(compressed_sealed_path.clone()))
452            .await?;
453
454        info!(
455            "compress-seal-put-object: upload object '{}'",
456            compressed_sealed_path
457        );
458        self.put_object(
459            Arc::new(compressed_sealed_path),
460            s3_bucket.clone(),
461            s3_key.clone(),
462        )
463        .await
464    }
465
466    /// Reverse of "compress_seal_put_object".
467    pub async fn get_object_unseal_decompress(
468        &self,
469        envelope: Arc<Envelope>,
470        s3_bucket: Arc<String>,
471        s3_key: Arc<String>,
472        file_path: Arc<String>,
473    ) -> Result<()> {
474        info!(
475            "get-object-unseal-decompress: downloading object {}/{}",
476            s3_bucket.as_str(),
477            s3_key.as_str()
478        );
479        let downloaded_path = random_manager::tmp_path(10, None).unwrap();
480        self.get_object(
481            s3_bucket.clone(),
482            s3_key.clone(),
483            Arc::new(downloaded_path.clone()),
484        )
485        .await?;
486
487        info!(
488            "get-object-unseal-decompress: unseal and decompress '{}'",
489            downloaded_path
490        );
491        envelope
492            .unseal_decompress(Arc::new(downloaded_path), file_path.clone())
493            .await
494    }
495}
496
497#[inline]
498pub fn is_error_retryable<E>(e: &SdkError<E>) -> bool {
499    match e {
500        SdkError::TimeoutError(_) | SdkError::ResponseError { .. } => true,
501        SdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
502        _ => false,
503    }
504}
505
506#[inline]
507fn is_error_bucket_already_exist(e: &SdkError<CreateBucketError>) -> bool {
508    match e {
509        SdkError::ServiceError { err, .. } => {
510            matches!(
511                err.kind,
512                CreateBucketErrorKind::BucketAlreadyExists(_)
513                    | CreateBucketErrorKind::BucketAlreadyOwnedByYou(_)
514            )
515        }
516        _ => false,
517    }
518}
519
520#[inline]
521fn is_error_bucket_does_not_exist(e: &SdkError<DeleteBucketError>) -> bool {
522    match e {
523        SdkError::ServiceError { err, .. } => {
524            let msg = format!("{:?}", err);
525            msg.contains("bucket does not exist")
526        }
527        _ => false,
528    }
529}
530
531#[test]
532fn test_append_slash() {
533    let s = "hello";
534    assert_eq!(append_slash(s), "hello/");
535
536    let s = "hello/";
537    assert_eq!(append_slash(s), "hello/");
538}
539
540pub fn append_slash(k: &str) -> String {
541    let n = k.len();
542    if &k[n - 1..] == "/" {
543        String::from(k)
544    } else {
545        format!("{}/", k)
546    }
547}
548
549pub async fn spawn_list_objects<S>(
550    s3_manager: Manager,
551    s3_bucket: S,
552    prefix: Option<String>,
553) -> Result<Vec<Object>>
554where
555    S: AsRef<str>,
556{
557    let s3_manager_arc = Arc::new(s3_manager);
558    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
559    let pfx = {
560        if let Some(s) = prefix {
561            if s.is_empty() {
562                None
563            } else {
564                Some(Arc::new(s))
565            }
566        } else {
567            None
568        }
569    };
570    tokio::spawn(async move { s3_manager_arc.list_objects(s3_bucket_arc, pfx).await })
571        .await
572        .expect("failed spawn await")
573}
574
575pub async fn spawn_delete_objects<S>(
576    s3_manager: Manager,
577    s3_bucket: S,
578    prefix: Option<String>,
579) -> Result<()>
580where
581    S: AsRef<str>,
582{
583    let s3_manager_arc = Arc::new(s3_manager);
584    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
585    let pfx = {
586        if let Some(s) = prefix {
587            if s.is_empty() {
588                None
589            } else {
590                Some(Arc::new(s))
591            }
592        } else {
593            None
594        }
595    };
596    tokio::spawn(async move { s3_manager_arc.delete_objects(s3_bucket_arc, pfx).await })
597        .await
598        .expect("failed spawn await")
599}
600
601pub async fn spawn_put_object<S>(
602    s3_manager: Manager,
603    file_path: S,
604    s3_bucket: S,
605    s3_key: S,
606) -> Result<()>
607where
608    S: AsRef<str>,
609{
610    let s3_manager_arc = Arc::new(s3_manager);
611    let file_path_arc = Arc::new(file_path.as_ref().to_string());
612    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
613    let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
614    tokio::spawn(async move {
615        s3_manager_arc
616            .put_object(file_path_arc, s3_bucket_arc, s3_key_arc)
617            .await
618    })
619    .await
620    .expect("failed spawn await")
621}
622
623pub async fn spawn_get_object<S>(
624    s3_manager: Manager,
625    s3_bucket: S,
626    s3_key: S,
627    file_path: S,
628) -> Result<()>
629where
630    S: AsRef<str>,
631{
632    let s3_manager_arc = Arc::new(s3_manager);
633    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
634    let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
635    let file_path_arc = Arc::new(file_path.as_ref().to_string());
636    tokio::spawn(async move {
637        s3_manager_arc
638            .get_object(s3_bucket_arc, s3_key_arc, file_path_arc)
639            .await
640    })
641    .await
642    .expect("failed spawn await")
643}
644
645pub async fn spawn_compress_seal_put_object<S>(
646    s3_manager: Manager,
647    envelope: Envelope,
648    file_path: S,
649    s3_bucket: S,
650    s3_key: S,
651) -> Result<()>
652where
653    S: AsRef<str>,
654{
655    let s3_manager_arc = Arc::new(s3_manager);
656    let envelope_arc = Arc::new(envelope);
657    let file_path_arc = Arc::new(file_path.as_ref().to_string());
658    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
659    let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
660    tokio::spawn(async move {
661        s3_manager_arc
662            .compress_seal_put_object(envelope_arc, file_path_arc, s3_bucket_arc, s3_key_arc)
663            .await
664    })
665    .await
666    .expect("failed spawn await")
667}
668
669pub async fn spawn_get_object_unseal_decompress<S>(
670    s3_manager: Manager,
671    envelope: Envelope,
672    s3_bucket: S,
673    s3_key: S,
674    file_path: S,
675) -> Result<()>
676where
677    S: AsRef<str>,
678{
679    let s3_manager_arc = Arc::new(s3_manager);
680    let envelope_arc = Arc::new(envelope);
681    let file_path_arc = Arc::new(file_path.as_ref().to_string());
682    let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
683    let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
684    tokio::spawn(async move {
685        s3_manager_arc
686            .get_object_unseal_decompress(envelope_arc, s3_bucket_arc, s3_key_arc, file_path_arc)
687            .await
688    })
689    .await
690    .expect("failed spawn await")
691}