1#![allow(clippy::default_trait_access)]
22#![allow(clippy::module_name_repetitions)]
23#![allow(clippy::redundant_closure)]
24#![allow(clippy::redundant_closure_for_method_calls)]
25#![allow(clippy::should_implement_trait)]
26#![allow(clippy::must_use_candidate)]
27#![allow(clippy::missing_errors_doc)]
28#![allow(clippy::type_repetition_in_bounds)]
29
30pub mod iter;
31use crate::iter::{GetObjectStream, ObjectStream};
32pub mod error;
33use crate::error::{S3ExtError, S3ExtResult};
34mod upload;
35
36use async_trait::async_trait;
37use log::debug;
38use rusoto_core::{
39 request::{HttpClient, TlsError},
40 Region,
41};
42use rusoto_credential::StaticProvider;
43use rusoto_s3::{
44 CompleteMultipartUploadOutput, GetObjectOutput, GetObjectRequest, PutObjectOutput,
45 PutObjectRequest, S3Client, StreamingBody, S3,
46};
47use std::{convert::AsRef, path::Path};
48use tokio::{
49 fs::{File, OpenOptions},
50 io,
51};
52
53pub fn new_s3client_with_credentials(
55 region: Region,
56 access_key: impl Into<String>,
57 secret_key: impl Into<String>,
58) -> Result<S3Client, TlsError> {
59 Ok(S3Client::new_with(
60 HttpClient::new()?,
61 StaticProvider::new_minimal(access_key.into(), secret_key.into()),
62 region,
63 ))
64}
65
66#[async_trait]
67pub trait S3Ext {
68 async fn download_to_file<F>(
70 &self,
71 source: GetObjectRequest,
72 target: F,
73 ) -> S3ExtResult<GetObjectOutput>
74 where
75 F: AsRef<Path> + Send + Sync;
76
77 async fn upload_from_file<F>(
86 &self,
87 source: F,
88 target: PutObjectRequest,
89 ) -> S3ExtResult<PutObjectOutput>
90 where
91 F: AsRef<Path> + Send + Sync;
92
93 async fn upload_from_file_multipart<F>(
102 &self,
103 source: F,
104 target: PutObjectRequest,
105 part_size: usize,
106 ) -> S3ExtResult<CompleteMultipartUploadOutput>
107 where
108 F: AsRef<Path> + Send + Sync;
109
110 async fn download<W>(
112 &self,
113 source: GetObjectRequest,
114 target: &mut W,
115 ) -> S3ExtResult<GetObjectOutput>
116 where
117 W: io::AsyncWrite + Unpin + Send;
118
119 async fn upload<R>(
128 &self,
129 source: &mut R,
130 target: PutObjectRequest,
131 ) -> S3ExtResult<PutObjectOutput>
132 where
133 R: io::AsyncRead + Unpin + Send;
134
135 async fn upload_multipart<R>(
144 &self,
145 source: &mut R,
146 target: PutObjectRequest,
147 part_size: usize,
148 ) -> S3ExtResult<CompleteMultipartUploadOutput>
149 where
150 R: io::AsyncRead + Unpin + Send;
151
152 fn stream_objects(&self, bucket: impl Into<String>) -> ObjectStream;
158
159 fn stream_objects_with_prefix(
163 &self,
164 bucket: impl Into<String>,
165 prefix: impl Into<String>,
166 ) -> ObjectStream;
167
168 fn stream_get_objects(&self, bucket: impl Into<String>) -> GetObjectStream;
172
173 fn stream_get_objects_with_prefix(
177 &self,
178 bucket: impl Into<String>,
179 prefix: impl Into<String>,
180 ) -> GetObjectStream;
181}
182
183#[async_trait]
184impl S3Ext for S3Client {
185 async fn download_to_file<F>(
186 &self,
187 source: GetObjectRequest,
188 target: F,
189 ) -> Result<GetObjectOutput, S3ExtError>
190 where
191 F: AsRef<Path> + Send + Sync,
192 {
193 debug!("downloading to file {:?}", target.as_ref());
194 let mut resp = self.get_object(source).await?;
195 let body = resp.body.take().expect("no body");
196 let mut target = OpenOptions::new()
197 .write(true)
198 .create_new(true)
199 .open(target)
200 .await?;
201 copy(body, &mut target).await?;
202 Ok(resp)
203 }
204
205 #[inline]
206 async fn upload_from_file<F>(
207 &self,
208 source: F,
209 target: PutObjectRequest,
210 ) -> S3ExtResult<PutObjectOutput>
211 where
212 F: AsRef<Path> + Send + Sync,
213 {
214 debug!("uploading file {:?}", source.as_ref());
215 let mut source = File::open(source).await?;
216 upload::upload(self, &mut source, target).await
217 }
218
219 #[inline]
220 async fn upload_from_file_multipart<F>(
221 &self,
222 source: F,
223 target: PutObjectRequest,
224 part_size: usize,
225 ) -> S3ExtResult<CompleteMultipartUploadOutput>
226 where
227 F: AsRef<Path> + Send + Sync,
228 {
229 debug!("uploading file {:?}", source.as_ref());
230 let mut source = File::open(source).await?;
231 upload::upload_multipart(self, &mut source, target, part_size).await
232 }
233
234 async fn download<W>(
235 &self,
236 source: GetObjectRequest,
237 mut target: &mut W,
238 ) -> S3ExtResult<GetObjectOutput>
239 where
240 W: io::AsyncWrite + Unpin + Send,
241 {
242 let mut resp = self.get_object(source).await?;
243 let body = resp.body.take().expect("no body");
244 copy(body, &mut target).await?;
245 Ok(resp)
246 }
247
248 #[inline]
249 async fn upload<R>(
250 &self,
251 source: &mut R,
252 target: PutObjectRequest,
253 ) -> S3ExtResult<PutObjectOutput>
254 where
255 R: io::AsyncRead + Unpin + Send,
256 {
257 upload::upload(self, source, target).await
258 }
259
260 #[inline]
261 async fn upload_multipart<R>(
262 &self,
263 mut source: &mut R,
264 target: PutObjectRequest,
265 part_size: usize,
266 ) -> S3ExtResult<CompleteMultipartUploadOutput>
267 where
268 R: io::AsyncRead + Unpin + Send,
269 {
270 upload::upload_multipart(self, &mut source, target, part_size).await
271 }
272
273 #[inline]
274 fn stream_objects(&self, bucket: impl Into<String>) -> ObjectStream {
275 ObjectStream::new(self, bucket, None as Option<&str>)
276 }
277
278 #[inline]
279 fn stream_objects_with_prefix(
280 &self,
281 bucket: impl Into<String>,
282 prefix: impl Into<String>,
283 ) -> ObjectStream {
284 ObjectStream::new(self, bucket, Some(prefix))
285 }
286
287 #[inline]
288 fn stream_get_objects(&self, bucket: impl Into<String>) -> GetObjectStream {
289 GetObjectStream::new(self, bucket, None as Option<&str>)
290 }
291
292 #[inline]
293 fn stream_get_objects_with_prefix(
294 &self,
295 bucket: impl Into<String>,
296 prefix: impl Into<String>,
297 ) -> GetObjectStream {
298 GetObjectStream::new(self, bucket, Some(prefix))
299 }
300}
301
302async fn copy<W>(src: StreamingBody, dest: &mut W) -> S3ExtResult<()>
303where
304 W: io::AsyncWrite + Unpin + Send,
305{
306 io::copy(&mut src.into_async_read(), dest).await?;
307 Ok(())
308}