1use std::{fs, path::Path, sync::Arc};
2
3use crate::{
4 errors::{
5 Error::{Other, API},
6 Result,
7 },
8 kms::envelope::Envelope,
9};
10use aws_sdk_s3::{
11 error::{CreateBucketError, CreateBucketErrorKind, DeleteBucketError},
12 model::{
13 BucketCannedAcl, BucketLocationConstraint, CreateBucketConfiguration, Delete, Object,
14 ObjectCannedAcl, ObjectIdentifier, PublicAccessBlockConfiguration, ServerSideEncryption,
15 ServerSideEncryptionByDefault, ServerSideEncryptionConfiguration, ServerSideEncryptionRule,
16 },
17 types::{ByteStream, SdkError},
18 Client,
19};
20use aws_types::SdkConfig as AwsSdkConfig;
21use log::{debug, info, warn};
22use tokio::{fs::File, io::AsyncWriteExt};
23use tokio_stream::StreamExt;
24
25#[derive(Debug, Clone)]
27pub struct Manager {
28 #[allow(dead_code)]
29 shared_config: AwsSdkConfig,
30 cli: Client,
31}
32
33impl Manager {
34 pub fn new(shared_config: &AwsSdkConfig) -> Self {
35 let cloned = shared_config.clone();
36 let cli = Client::new(shared_config);
37 Self {
38 shared_config: cloned,
39 cli,
40 }
41 }
42
43 pub async fn create_bucket(&self, s3_bucket: &str) -> Result<()> {
45 let reg = self.shared_config.region().unwrap();
46 let constraint = BucketLocationConstraint::from(reg.to_string().as_str());
47 let bucket_cfg = CreateBucketConfiguration::builder()
48 .location_constraint(constraint)
49 .build();
50
51 info!(
52 "creating S3 bucket '{}' in region {}",
53 s3_bucket,
54 reg.to_string()
55 );
56 let ret = self
57 .cli
58 .create_bucket()
59 .create_bucket_configuration(bucket_cfg)
60 .bucket(s3_bucket)
61 .acl(BucketCannedAcl::Private)
62 .send()
63 .await;
64 let already_created = match ret {
65 Ok(_) => false,
66 Err(e) => {
67 if !is_error_bucket_already_exist(&e) {
68 return Err(API {
69 message: format!("failed create_bucket {:?}", e),
70 is_retryable: is_error_retryable(&e),
71 });
72 }
73 warn!("bucket already exists ({})", e);
74 true
75 }
76 };
77 if already_created {
78 return Ok(());
79 }
80 info!("created S3 bucket '{}'", s3_bucket);
81
82 info!("setting S3 bucket public_access_block configuration to private");
83 let public_access_block_cfg = PublicAccessBlockConfiguration::builder()
84 .block_public_acls(true)
85 .block_public_policy(true)
86 .ignore_public_acls(true)
87 .restrict_public_buckets(true)
88 .build();
89 self.cli
90 .put_public_access_block()
91 .bucket(s3_bucket)
92 .public_access_block_configuration(public_access_block_cfg)
93 .send()
94 .await
95 .map_err(|e| API {
96 message: format!("failed put_public_access_block {}", e),
97 is_retryable: is_error_retryable(&e),
98 })?;
99
100 let algo = ServerSideEncryption::Aes256;
101 let sse = ServerSideEncryptionByDefault::builder()
102 .set_sse_algorithm(Some(algo))
103 .build();
104 let server_side_encryption_rule = ServerSideEncryptionRule::builder()
105 .apply_server_side_encryption_by_default(sse)
106 .build();
107 let server_side_encryption_cfg = ServerSideEncryptionConfiguration::builder()
108 .rules(server_side_encryption_rule)
109 .build();
110 self.cli
111 .put_bucket_encryption()
112 .bucket(s3_bucket)
113 .server_side_encryption_configuration(server_side_encryption_cfg)
114 .send()
115 .await
116 .map_err(|e| API {
117 message: format!("failed put_bucket_encryption {}", e),
118 is_retryable: is_error_retryable(&e),
119 })?;
120
121 Ok(())
122 }
123
124 pub async fn delete_bucket(&self, s3_bucket: &str) -> Result<()> {
126 let reg = self.shared_config.region().unwrap();
127 info!(
128 "deleting S3 bucket '{}' in region {}",
129 s3_bucket,
130 reg.to_string()
131 );
132 let ret = self.cli.delete_bucket().bucket(s3_bucket).send().await;
133 match ret {
134 Ok(_) => {}
135 Err(e) => {
136 if !is_error_bucket_does_not_exist(&e) {
137 return Err(API {
138 message: format!("failed delete_bucket {:?}", e),
139 is_retryable: is_error_retryable(&e),
140 });
141 }
142 warn!("bucket already deleted or does not exist ({})", e);
143 }
144 };
145 info!("deleted S3 bucket '{}'", s3_bucket);
146
147 Ok(())
148 }
149
150 pub async fn delete_objects(
158 &self,
159 s3_bucket: Arc<String>,
160 prefix: Option<Arc<String>>,
161 ) -> Result<()> {
162 let reg = self.shared_config.region().unwrap();
163 info!(
164 "deleting objects S3 bucket '{}' in region {} (prefix {:?})",
165 s3_bucket,
166 reg.to_string(),
167 prefix,
168 );
169
170 let objects = self.list_objects(s3_bucket.clone(), prefix).await?;
171 let mut object_ids: Vec<ObjectIdentifier> = vec![];
172 for obj in objects {
173 let k = String::from(obj.key().unwrap_or(""));
174 let obj_id = ObjectIdentifier::builder().set_key(Some(k)).build();
175 object_ids.push(obj_id);
176 }
177
178 let n = object_ids.len();
179 if n > 0 {
180 let deletes = Delete::builder().set_objects(Some(object_ids)).build();
181 let ret = self
182 .cli
183 .delete_objects()
184 .bucket(s3_bucket.to_string())
185 .delete(deletes)
186 .send()
187 .await;
188 match ret {
189 Ok(_) => {}
190 Err(e) => {
191 return Err(API {
192 message: format!("failed delete_bucket {:?}", e),
193 is_retryable: is_error_retryable(&e),
194 });
195 }
196 };
197 info!("deleted {} objets in S3 bucket '{}'", n, s3_bucket);
198 } else {
199 info!("nothing to delete; skipping...");
200 }
201
202 Ok(())
203 }
204
205 pub async fn list_objects(
215 &self,
216 s3_bucket: Arc<String>,
217 prefix: Option<Arc<String>>,
218 ) -> Result<Vec<Object>> {
219 let pfx = {
220 if let Some(s) = prefix {
221 let s = s.to_string();
222 if s.is_empty() {
223 None
224 } else {
225 Some(s)
226 }
227 } else {
228 None
229 }
230 };
231
232 info!("listing bucket {} with prefix '{:?}'", s3_bucket, pfx);
233 let mut objects: Vec<Object> = Vec::new();
234 let mut token = String::new();
235 loop {
236 let mut builder = self.cli.list_objects_v2().bucket(s3_bucket.to_string());
237 if pfx.is_some() {
238 builder = builder.set_prefix(pfx.clone());
239 }
240 if !token.is_empty() {
241 builder = builder.set_continuation_token(Some(token.to_owned()));
242 }
243 let ret = match builder.send().await {
244 Ok(r) => r,
245 Err(e) => {
246 return Err(API {
247 message: format!("failed list_objects_v2 {:?}", e),
248 is_retryable: is_error_retryable(&e),
249 });
250 }
251 };
252 if ret.key_count == 0 {
253 break;
254 }
255 if ret.contents.is_none() {
256 break;
257 }
258 let contents = ret.contents.unwrap();
259 for obj in contents.iter() {
260 let k = obj.key().unwrap_or("");
261 if k.is_empty() {
262 return Err(API {
263 message: String::from("empty key returned"),
264 is_retryable: false,
265 });
266 }
267 debug!("listing [{}]", k);
268 objects.push(obj.to_owned());
269 }
270
271 token = match ret.next_continuation_token {
272 Some(v) => v,
273 None => String::new(),
274 };
275 if token.is_empty() {
276 break;
277 }
278 }
279
280 if objects.len() > 1 {
281 info!(
282 "sorting {} objects in bucket {} with prefix {:?}",
283 objects.len(),
284 s3_bucket,
285 pfx
286 );
287 objects.sort_by(|a, b| {
288 let a_modified = a.last_modified.unwrap();
289 let a_modified = a_modified.as_nanos();
290
291 let b_modified = b.last_modified.unwrap();
292 let b_modified = b_modified.as_nanos();
293
294 b_modified.cmp(&a_modified)
298 });
299 }
300 Ok(objects)
301 }
302
303 pub async fn put_object(
313 &self,
314 file_path: Arc<String>,
315 s3_bucket: Arc<String>,
316 s3_key: Arc<String>,
317 ) -> Result<()> {
318 if !Path::new(&file_path.to_string()).exists() {
319 return Err(Other {
320 message: format!("file path {} does not exist", file_path),
321 is_retryable: false,
322 });
323 }
324
325 let meta = fs::metadata(file_path.as_str()).map_err(|e| Other {
326 message: format!("failed metadata {}", e),
327 is_retryable: false,
328 })?;
329 let size = meta.len() as f64;
330 info!(
331 "starting put_object '{}' (size {}) to 's3://{}/{}'",
332 file_path,
333 human_readable::bytes(size),
334 s3_bucket,
335 s3_key
336 );
337
338 let byte_stream = ByteStream::from_path(Path::new(file_path.as_str()))
339 .await
340 .map_err(|e| Other {
341 message: format!("failed ByteStream::from_file {}", e),
342 is_retryable: false,
343 })?;
344 self.cli
345 .put_object()
346 .bucket(s3_bucket.to_string())
347 .key(s3_key.to_string())
348 .body(byte_stream)
349 .acl(ObjectCannedAcl::Private)
350 .send()
351 .await
352 .map_err(|e| API {
353 message: format!("failed put_object {}", e),
354 is_retryable: is_error_retryable(&e),
355 })?;
356
357 Ok(())
358 }
359
360 pub async fn get_object(
370 &self,
371 s3_bucket: Arc<String>,
372 s3_key: Arc<String>,
373 file_path: Arc<String>,
374 ) -> Result<()> {
375 if Path::new(file_path.as_str()).exists() {
376 return Err(Other {
377 message: format!("file path {} already exists", file_path),
378 is_retryable: false,
379 });
380 }
381
382 let head_output = self
383 .cli
384 .head_object()
385 .bucket(s3_bucket.to_string())
386 .key(s3_key.to_string())
387 .send()
388 .await
389 .map_err(|e| API {
390 message: format!("failed head_object {}", e),
391 is_retryable: is_error_retryable(&e),
392 })?;
393
394 info!(
395 "starting get_object 's3://{}/{}' (content type '{}', size {})",
396 s3_bucket,
397 s3_key,
398 head_output.content_type().unwrap(),
399 human_readable::bytes(head_output.content_length() as f64),
400 );
401 let mut output = self
402 .cli
403 .get_object()
404 .bucket(s3_bucket.to_string())
405 .key(s3_key.to_string())
406 .send()
407 .await
408 .map_err(|e| API {
409 message: format!("failed get_object {}", e),
410 is_retryable: is_error_retryable(&e),
411 })?;
412
413 let mut file = File::create(file_path.as_str()).await.map_err(|e| Other {
415 message: format!("failed File::create {}", e),
416 is_retryable: false,
417 })?;
418
419 info!("writing byte stream to file {}", file_path);
420 while let Some(d) = output.body.try_next().await.map_err(|e| Other {
421 message: format!("failed ByteStream::try_next {}", e),
422 is_retryable: false,
423 })? {
424 file.write_all(&d).await.map_err(|e| API {
425 message: format!("failed File.write_all {}", e),
426 is_retryable: false,
427 })?;
428 }
429 file.flush().await.map_err(|e| Other {
430 message: format!("failed File.flush {}", e),
431 is_retryable: false,
432 })?;
433
434 Ok(())
435 }
436
437 pub async fn compress_seal_put_object(
439 &self,
440 envelope: Arc<Envelope>,
441 file_path: Arc<String>,
442 s3_bucket: Arc<String>,
443 s3_key: Arc<String>,
444 ) -> Result<()> {
445 info!(
446 "compress-seal-put-object: compress and seal '{}'",
447 file_path.as_str()
448 );
449 let compressed_sealed_path = random_manager::tmp_path(10, None).unwrap();
450 envelope
451 .compress_seal(file_path.clone(), Arc::new(compressed_sealed_path.clone()))
452 .await?;
453
454 info!(
455 "compress-seal-put-object: upload object '{}'",
456 compressed_sealed_path
457 );
458 self.put_object(
459 Arc::new(compressed_sealed_path),
460 s3_bucket.clone(),
461 s3_key.clone(),
462 )
463 .await
464 }
465
466 pub async fn get_object_unseal_decompress(
468 &self,
469 envelope: Arc<Envelope>,
470 s3_bucket: Arc<String>,
471 s3_key: Arc<String>,
472 file_path: Arc<String>,
473 ) -> Result<()> {
474 info!(
475 "get-object-unseal-decompress: downloading object {}/{}",
476 s3_bucket.as_str(),
477 s3_key.as_str()
478 );
479 let downloaded_path = random_manager::tmp_path(10, None).unwrap();
480 self.get_object(
481 s3_bucket.clone(),
482 s3_key.clone(),
483 Arc::new(downloaded_path.clone()),
484 )
485 .await?;
486
487 info!(
488 "get-object-unseal-decompress: unseal and decompress '{}'",
489 downloaded_path
490 );
491 envelope
492 .unseal_decompress(Arc::new(downloaded_path), file_path.clone())
493 .await
494 }
495}
496
497#[inline]
498pub fn is_error_retryable<E>(e: &SdkError<E>) -> bool {
499 match e {
500 SdkError::TimeoutError(_) | SdkError::ResponseError { .. } => true,
501 SdkError::DispatchFailure(e) => e.is_timeout() || e.is_io(),
502 _ => false,
503 }
504}
505
506#[inline]
507fn is_error_bucket_already_exist(e: &SdkError<CreateBucketError>) -> bool {
508 match e {
509 SdkError::ServiceError { err, .. } => {
510 matches!(
511 err.kind,
512 CreateBucketErrorKind::BucketAlreadyExists(_)
513 | CreateBucketErrorKind::BucketAlreadyOwnedByYou(_)
514 )
515 }
516 _ => false,
517 }
518}
519
520#[inline]
521fn is_error_bucket_does_not_exist(e: &SdkError<DeleteBucketError>) -> bool {
522 match e {
523 SdkError::ServiceError { err, .. } => {
524 let msg = format!("{:?}", err);
525 msg.contains("bucket does not exist")
526 }
527 _ => false,
528 }
529}
530
531#[test]
532fn test_append_slash() {
533 let s = "hello";
534 assert_eq!(append_slash(s), "hello/");
535
536 let s = "hello/";
537 assert_eq!(append_slash(s), "hello/");
538}
539
540pub fn append_slash(k: &str) -> String {
541 let n = k.len();
542 if &k[n - 1..] == "/" {
543 String::from(k)
544 } else {
545 format!("{}/", k)
546 }
547}
548
549pub async fn spawn_list_objects<S>(
550 s3_manager: Manager,
551 s3_bucket: S,
552 prefix: Option<String>,
553) -> Result<Vec<Object>>
554where
555 S: AsRef<str>,
556{
557 let s3_manager_arc = Arc::new(s3_manager);
558 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
559 let pfx = {
560 if let Some(s) = prefix {
561 if s.is_empty() {
562 None
563 } else {
564 Some(Arc::new(s))
565 }
566 } else {
567 None
568 }
569 };
570 tokio::spawn(async move { s3_manager_arc.list_objects(s3_bucket_arc, pfx).await })
571 .await
572 .expect("failed spawn await")
573}
574
575pub async fn spawn_delete_objects<S>(
576 s3_manager: Manager,
577 s3_bucket: S,
578 prefix: Option<String>,
579) -> Result<()>
580where
581 S: AsRef<str>,
582{
583 let s3_manager_arc = Arc::new(s3_manager);
584 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
585 let pfx = {
586 if let Some(s) = prefix {
587 if s.is_empty() {
588 None
589 } else {
590 Some(Arc::new(s))
591 }
592 } else {
593 None
594 }
595 };
596 tokio::spawn(async move { s3_manager_arc.delete_objects(s3_bucket_arc, pfx).await })
597 .await
598 .expect("failed spawn await")
599}
600
601pub async fn spawn_put_object<S>(
602 s3_manager: Manager,
603 file_path: S,
604 s3_bucket: S,
605 s3_key: S,
606) -> Result<()>
607where
608 S: AsRef<str>,
609{
610 let s3_manager_arc = Arc::new(s3_manager);
611 let file_path_arc = Arc::new(file_path.as_ref().to_string());
612 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
613 let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
614 tokio::spawn(async move {
615 s3_manager_arc
616 .put_object(file_path_arc, s3_bucket_arc, s3_key_arc)
617 .await
618 })
619 .await
620 .expect("failed spawn await")
621}
622
623pub async fn spawn_get_object<S>(
624 s3_manager: Manager,
625 s3_bucket: S,
626 s3_key: S,
627 file_path: S,
628) -> Result<()>
629where
630 S: AsRef<str>,
631{
632 let s3_manager_arc = Arc::new(s3_manager);
633 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
634 let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
635 let file_path_arc = Arc::new(file_path.as_ref().to_string());
636 tokio::spawn(async move {
637 s3_manager_arc
638 .get_object(s3_bucket_arc, s3_key_arc, file_path_arc)
639 .await
640 })
641 .await
642 .expect("failed spawn await")
643}
644
645pub async fn spawn_compress_seal_put_object<S>(
646 s3_manager: Manager,
647 envelope: Envelope,
648 file_path: S,
649 s3_bucket: S,
650 s3_key: S,
651) -> Result<()>
652where
653 S: AsRef<str>,
654{
655 let s3_manager_arc = Arc::new(s3_manager);
656 let envelope_arc = Arc::new(envelope);
657 let file_path_arc = Arc::new(file_path.as_ref().to_string());
658 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
659 let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
660 tokio::spawn(async move {
661 s3_manager_arc
662 .compress_seal_put_object(envelope_arc, file_path_arc, s3_bucket_arc, s3_key_arc)
663 .await
664 })
665 .await
666 .expect("failed spawn await")
667}
668
669pub async fn spawn_get_object_unseal_decompress<S>(
670 s3_manager: Manager,
671 envelope: Envelope,
672 s3_bucket: S,
673 s3_key: S,
674 file_path: S,
675) -> Result<()>
676where
677 S: AsRef<str>,
678{
679 let s3_manager_arc = Arc::new(s3_manager);
680 let envelope_arc = Arc::new(envelope);
681 let file_path_arc = Arc::new(file_path.as_ref().to_string());
682 let s3_bucket_arc = Arc::new(s3_bucket.as_ref().to_string());
683 let s3_key_arc = Arc::new(s3_key.as_ref().to_string());
684 tokio::spawn(async move {
685 s3_manager_arc
686 .get_object_unseal_decompress(envelope_arc, s3_bucket_arc, s3_key_arc, file_path_arc)
687 .await
688 })
689 .await
690 .expect("failed spawn await")
691}