1use crate::error::{Error, Result};
8use async_recursion::async_recursion;
9use aws_sdk_s3::{error::ProvideErrorMetadata, types::ObjectCannedAcl, Client};
10use std::path::{Path, PathBuf};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio_stream::StreamExt;
13
14#[derive(Clone)]
15pub struct S3Repository {}
16
17impl S3Repository {
18 pub async fn upload_file(
19 &self,
20 bucket_name: &str,
21 file_path: &Path,
22 public: bool,
23 ) -> Result<()> {
24 let conf = aws_config::from_env().region("eu-west-2").load().await;
25 let client = Client::new(&conf);
26 let object_key = file_path
27 .file_name()
28 .ok_or_else(|| Error::FilenameNotRetrieved)?
29 .to_str()
30 .ok_or_else(|| Error::FilenameNotRetrieved)?;
31
32 println!("Uploading {object_key} to bucket {bucket_name}");
33
34 let mut file = tokio::fs::File::open(file_path).await?;
35 let mut contents = Vec::new();
36 file.read_to_end(&mut contents).await?;
37
38 let mut req = client
39 .put_object()
40 .bucket(bucket_name)
41 .key(object_key)
42 .body(contents.into());
43 if public {
44 req = req.acl(ObjectCannedAcl::PublicRead);
45 }
46 req.send().await.map_err(|_| {
47 Error::PutS3ObjectError(object_key.to_string(), bucket_name.to_string())
48 })?;
49
50 println!("{object_key} has been uploaded to {bucket_name}");
51 Ok(())
52 }
53
54 pub async fn download_object(
55 &self,
56 bucket_name: &str,
57 object_key: &str,
58 dest_path: &Path,
59 ) -> Result<()> {
60 let conf = aws_config::from_env().region("eu-west-2").load().await;
61 let client = Client::new(&conf);
62 self.retrieve_object(&client, bucket_name, object_key, &dest_path.to_path_buf())
63 .await?;
64 Ok(())
65 }
66
67 pub async fn download_folder(
68 &self,
69 bucket_name: &str,
70 folder_path: &str,
71 dest_path: &Path,
72 ) -> Result<()> {
73 let conf = aws_config::from_env().region("eu-west-2").load().await;
74 let client = Client::new(&conf);
75 tokio::fs::create_dir_all(dest_path).await?;
76 self.list_and_retrieve(&client, bucket_name, folder_path, &dest_path.to_path_buf())
77 .await?;
78 Ok(())
79 }
80
81 pub async fn delete_object(&self, bucket_name: &str, object_key: &str) -> Result<()> {
82 let conf = aws_config::from_env().region("eu-west-2").load().await;
83 let client = Client::new(&conf);
84 self.do_delete_object(&client, bucket_name, object_key)
85 .await?;
86 Ok(())
87 }
88
89 pub async fn delete_folder(&self, bucket_name: &str, folder_path: &str) -> Result<()> {
90 let conf = aws_config::from_env().region("eu-west-2").load().await;
91 let client = Client::new(&conf);
92 self.list_and_delete(&client, bucket_name, folder_path)
93 .await?;
94 Ok(())
95 }
96
97 pub async fn folder_exists(&self, bucket_name: &str, folder_path: &str) -> Result<bool> {
98 let conf = aws_config::from_env().region("eu-west-2").load().await;
99
100 let client = Client::new(&conf);
101 let prefix = if folder_path.ends_with('/') {
102 folder_path.to_string()
103 } else {
104 format!("{folder_path}/")
105 };
106 let output = client
107 .list_objects_v2()
108 .bucket(bucket_name)
109 .prefix(&prefix)
110 .delimiter("/")
111 .send()
112 .await
113 .map_err(|err| Error::ListS3ObjectsError {
114 prefix,
115 error: err.meta().message().unwrap_or_default().to_string(),
116 })?;
117 Ok(!output.contents().unwrap_or_default().is_empty())
118 }
119
120 #[async_recursion]
121 async fn list_and_retrieve(
122 &self,
123 client: &Client,
124 bucket_name: &str,
125 prefix: &str,
126 root_path: &PathBuf,
127 ) -> Result<(), Error> {
128 let output = client
129 .list_objects_v2()
130 .bucket(bucket_name)
131 .prefix(prefix)
132 .delimiter("/")
133 .send()
134 .await
135 .map_err(|err| Error::ListS3ObjectsError {
136 prefix: prefix.to_string(),
137 error: err.meta().message().unwrap_or_default().to_string(),
138 })?;
139
140 if let Some(common_prefixes) = output.common_prefixes {
142 for cp in common_prefixes {
143 let next_prefix = cp.prefix.unwrap();
144 self.list_and_retrieve(client, bucket_name, &next_prefix, root_path)
145 .await?;
146 }
147 }
148
149 if let Some(objects) = output.contents {
150 for object in objects {
151 let object_key = object.key.unwrap();
152 let mut dest_file_path = root_path.clone();
153 dest_file_path.push(&object_key);
154 if dest_file_path.exists() {
155 println!("Has already been retrieved in a previous sync.");
156 continue;
157 }
158 self.retrieve_object(client, bucket_name, &object_key, &dest_file_path)
159 .await?;
160 }
161 }
162
163 Ok(())
164 }
165
166 #[async_recursion]
167 async fn list_and_delete(
168 &self,
169 client: &Client,
170 bucket_name: &str,
171 prefix: &str,
172 ) -> Result<(), Error> {
173 let output = client
174 .list_objects_v2()
175 .bucket(bucket_name)
176 .prefix(prefix)
177 .delimiter("/")
178 .send()
179 .await
180 .map_err(|err| Error::ListS3ObjectsError {
181 prefix: prefix.to_string(),
182 error: err.meta().message().unwrap_or_default().to_string(),
183 })?;
184
185 if let Some(common_prefixes) = output.common_prefixes {
187 for cp in common_prefixes {
188 let next_prefix = cp.prefix.unwrap();
189 self.list_and_delete(client, bucket_name, &next_prefix)
190 .await?;
191 }
192 }
193
194 if let Some(objects) = output.contents {
195 for object in objects {
196 let object_key = object.key.unwrap();
197 self.do_delete_object(client, bucket_name, &object_key)
198 .await?;
199 }
200 }
201
202 Ok(())
203 }
204
205 async fn retrieve_object(
206 &self,
207 client: &Client,
208 bucket_name: &str,
209 object_key: &str,
210 dest_path: &PathBuf,
211 ) -> Result<()> {
212 println!("Retrieving {object_key} from S3...");
213 let mut resp = client
214 .get_object()
215 .bucket(bucket_name)
216 .key(object_key)
217 .send()
218 .await
219 .map_err(|_| {
220 Error::GetS3ObjectError(object_key.to_string(), bucket_name.to_string())
221 })?;
222
223 if let Some(parent) = dest_path.parent() {
224 if !parent.exists() {
225 tokio::fs::create_dir_all(parent).await?;
226 }
227 }
228
229 let mut file = tokio::fs::File::create(&dest_path).await?;
230 while let Some(bytes) = resp
231 .body
232 .try_next()
233 .await
234 .map_err(|_| Error::S3ByteStreamError)?
235 {
236 file.write_all(&bytes).await?;
237 }
238
239 println!("Saved at {}", dest_path.to_string_lossy());
240 Ok(())
241 }
242
243 async fn do_delete_object(
244 &self,
245 client: &Client,
246 bucket_name: &str,
247 object_key: &str,
248 ) -> Result<()> {
249 println!("Deleting {object_key} from S3...");
250 client
251 .delete_object()
252 .bucket(bucket_name)
253 .key(object_key)
254 .send()
255 .await
256 .map_err(|_| {
257 Error::DeleteS3ObjectError(object_key.to_string(), bucket_name.to_string())
258 })?;
259 Ok(())
260 }
261}