sn_testnet_deploy/
s3.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::error::{Error, Result};
8use async_recursion::async_recursion;
9use aws_sdk_s3::{error::ProvideErrorMetadata, types::ObjectCannedAcl, Client};
10use std::path::{Path, PathBuf};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio_stream::StreamExt;
13
14#[derive(Clone)]
15pub struct S3Repository {}
16
17impl S3Repository {
18    pub async fn upload_file(
19        &self,
20        bucket_name: &str,
21        file_path: &Path,
22        public: bool,
23    ) -> Result<()> {
24        let conf = aws_config::from_env().region("eu-west-2").load().await;
25        let client = Client::new(&conf);
26        let object_key = file_path
27            .file_name()
28            .ok_or_else(|| Error::FilenameNotRetrieved)?
29            .to_str()
30            .ok_or_else(|| Error::FilenameNotRetrieved)?;
31
32        println!("Uploading {object_key} to bucket {bucket_name}");
33
34        let mut file = tokio::fs::File::open(file_path).await?;
35        let mut contents = Vec::new();
36        file.read_to_end(&mut contents).await?;
37
38        let mut req = client
39            .put_object()
40            .bucket(bucket_name)
41            .key(object_key)
42            .body(contents.into());
43        if public {
44            req = req.acl(ObjectCannedAcl::PublicRead);
45        }
46        req.send().await.map_err(|_| {
47            Error::PutS3ObjectError(object_key.to_string(), bucket_name.to_string())
48        })?;
49
50        println!("{object_key} has been uploaded to {bucket_name}");
51        Ok(())
52    }
53
54    pub async fn download_object(
55        &self,
56        bucket_name: &str,
57        object_key: &str,
58        dest_path: &Path,
59    ) -> Result<()> {
60        let conf = aws_config::from_env().region("eu-west-2").load().await;
61        let client = Client::new(&conf);
62        self.retrieve_object(&client, bucket_name, object_key, &dest_path.to_path_buf())
63            .await?;
64        Ok(())
65    }
66
67    pub async fn download_folder(
68        &self,
69        bucket_name: &str,
70        folder_path: &str,
71        dest_path: &Path,
72    ) -> Result<()> {
73        let conf = aws_config::from_env().region("eu-west-2").load().await;
74        let client = Client::new(&conf);
75        tokio::fs::create_dir_all(dest_path).await?;
76        self.list_and_retrieve(&client, bucket_name, folder_path, &dest_path.to_path_buf())
77            .await?;
78        Ok(())
79    }
80
81    pub async fn delete_object(&self, bucket_name: &str, object_key: &str) -> Result<()> {
82        let conf = aws_config::from_env().region("eu-west-2").load().await;
83        let client = Client::new(&conf);
84        self.do_delete_object(&client, bucket_name, object_key)
85            .await?;
86        Ok(())
87    }
88
89    pub async fn delete_folder(&self, bucket_name: &str, folder_path: &str) -> Result<()> {
90        let conf = aws_config::from_env().region("eu-west-2").load().await;
91        let client = Client::new(&conf);
92        self.list_and_delete(&client, bucket_name, folder_path)
93            .await?;
94        Ok(())
95    }
96
97    pub async fn folder_exists(&self, bucket_name: &str, folder_path: &str) -> Result<bool> {
98        let conf = aws_config::from_env().region("eu-west-2").load().await;
99
100        let client = Client::new(&conf);
101        let prefix = if folder_path.ends_with('/') {
102            folder_path.to_string()
103        } else {
104            format!("{folder_path}/")
105        };
106        let output = client
107            .list_objects_v2()
108            .bucket(bucket_name)
109            .prefix(&prefix)
110            .delimiter("/")
111            .send()
112            .await
113            .map_err(|err| Error::ListS3ObjectsError {
114                prefix,
115                error: err.meta().message().unwrap_or_default().to_string(),
116            })?;
117        Ok(!output.contents().unwrap_or_default().is_empty())
118    }
119
120    #[async_recursion]
121    async fn list_and_retrieve(
122        &self,
123        client: &Client,
124        bucket_name: &str,
125        prefix: &str,
126        root_path: &PathBuf,
127    ) -> Result<(), Error> {
128        let output = client
129            .list_objects_v2()
130            .bucket(bucket_name)
131            .prefix(prefix)
132            .delimiter("/")
133            .send()
134            .await
135            .map_err(|err| Error::ListS3ObjectsError {
136                prefix: prefix.to_string(),
137                error: err.meta().message().unwrap_or_default().to_string(),
138            })?;
139
140        // So-called 'common prefixes' are subdirectories.
141        if let Some(common_prefixes) = output.common_prefixes {
142            for cp in common_prefixes {
143                let next_prefix = cp.prefix.unwrap();
144                self.list_and_retrieve(client, bucket_name, &next_prefix, root_path)
145                    .await?;
146            }
147        }
148
149        if let Some(objects) = output.contents {
150            for object in objects {
151                let object_key = object.key.unwrap();
152                let mut dest_file_path = root_path.clone();
153                dest_file_path.push(&object_key);
154                if dest_file_path.exists() {
155                    println!("Has already been retrieved in a previous sync.");
156                    continue;
157                }
158                self.retrieve_object(client, bucket_name, &object_key, &dest_file_path)
159                    .await?;
160            }
161        }
162
163        Ok(())
164    }
165
166    #[async_recursion]
167    async fn list_and_delete(
168        &self,
169        client: &Client,
170        bucket_name: &str,
171        prefix: &str,
172    ) -> Result<(), Error> {
173        let output = client
174            .list_objects_v2()
175            .bucket(bucket_name)
176            .prefix(prefix)
177            .delimiter("/")
178            .send()
179            .await
180            .map_err(|err| Error::ListS3ObjectsError {
181                prefix: prefix.to_string(),
182                error: err.meta().message().unwrap_or_default().to_string(),
183            })?;
184
185        // So-called 'common prefixes' are subdirectories.
186        if let Some(common_prefixes) = output.common_prefixes {
187            for cp in common_prefixes {
188                let next_prefix = cp.prefix.unwrap();
189                self.list_and_delete(client, bucket_name, &next_prefix)
190                    .await?;
191            }
192        }
193
194        if let Some(objects) = output.contents {
195            for object in objects {
196                let object_key = object.key.unwrap();
197                self.do_delete_object(client, bucket_name, &object_key)
198                    .await?;
199            }
200        }
201
202        Ok(())
203    }
204
205    async fn retrieve_object(
206        &self,
207        client: &Client,
208        bucket_name: &str,
209        object_key: &str,
210        dest_path: &PathBuf,
211    ) -> Result<()> {
212        println!("Retrieving {object_key} from S3...");
213        let mut resp = client
214            .get_object()
215            .bucket(bucket_name)
216            .key(object_key)
217            .send()
218            .await
219            .map_err(|_| {
220                Error::GetS3ObjectError(object_key.to_string(), bucket_name.to_string())
221            })?;
222
223        if let Some(parent) = dest_path.parent() {
224            if !parent.exists() {
225                tokio::fs::create_dir_all(parent).await?;
226            }
227        }
228
229        let mut file = tokio::fs::File::create(&dest_path).await?;
230        while let Some(bytes) = resp
231            .body
232            .try_next()
233            .await
234            .map_err(|_| Error::S3ByteStreamError)?
235        {
236            file.write_all(&bytes).await?;
237        }
238
239        println!("Saved at {}", dest_path.to_string_lossy());
240        Ok(())
241    }
242
243    async fn do_delete_object(
244        &self,
245        client: &Client,
246        bucket_name: &str,
247        object_key: &str,
248    ) -> Result<()> {
249        println!("Deleting {object_key} from S3...");
250        client
251            .delete_object()
252            .bucket(bucket_name)
253            .key(object_key)
254            .send()
255            .await
256            .map_err(|_| {
257                Error::DeleteS3ObjectError(object_key.to_string(), bucket_name.to_string())
258            })?;
259        Ok(())
260    }
261}