s3_ext/
lib.rs

1//! Simpler Simple Storage Service: high-level API extensions for Rusoto's
2//! `S3Client`
3//!
4//! # TLS Support
5//!
6//! For TLS support [`rustls`](https://crates.io/crates/rustls) is used by default.
7//!
8//! Alternatively, [`native-tls`](https://crates.io/crates/native-tls) can be used by
9//! disabling the default features and enabling the `native-tls` feature.
10//!
11//! For instance, like this:
12//!
13//! ```toml
14//! [dependencies]
15//! s3_ext = { version = "…", default_features = false }
16//!
17//! [features]
18//! default = ["s3_ext/native-tls"]
19//! ```
20
21#![allow(clippy::default_trait_access)]
22#![allow(clippy::module_name_repetitions)]
23#![allow(clippy::redundant_closure)]
24#![allow(clippy::redundant_closure_for_method_calls)]
25#![allow(clippy::should_implement_trait)]
26#![allow(clippy::must_use_candidate)]
27#![allow(clippy::missing_errors_doc)]
28#![allow(clippy::type_repetition_in_bounds)]
29
30pub mod iter;
31use crate::iter::{GetObjectStream, ObjectStream};
32pub mod error;
33use crate::error::{S3ExtError, S3ExtResult};
34mod upload;
35
36use async_trait::async_trait;
37use log::debug;
38use rusoto_core::{
39    request::{HttpClient, TlsError},
40    Region,
41};
42use rusoto_credential::StaticProvider;
43use rusoto_s3::{
44    CompleteMultipartUploadOutput, GetObjectOutput, GetObjectRequest, PutObjectOutput,
45    PutObjectRequest, S3Client, StreamingBody, S3,
46};
47use std::{convert::AsRef, path::Path};
48use tokio::{
49    fs::{File, OpenOptions},
50    io,
51};
52
53/// Create client using given static access/secret keys
54pub fn new_s3client_with_credentials(
55    region: Region,
56    access_key: impl Into<String>,
57    secret_key: impl Into<String>,
58) -> Result<S3Client, TlsError> {
59    Ok(S3Client::new_with(
60        HttpClient::new()?,
61        StaticProvider::new_minimal(access_key.into(), secret_key.into()),
62        region,
63    ))
64}
65
66#[async_trait]
67pub trait S3Ext {
68    /// Get object and write it to file `target`
69    async fn download_to_file<F>(
70        &self,
71        source: GetObjectRequest,
72        target: F,
73    ) -> S3ExtResult<GetObjectOutput>
74    where
75        F: AsRef<Path> + Send + Sync;
76
77    /// Upload content of file to S3
78    ///
79    /// # Caveats
80    ///
81    /// The current implementation is incomplete. For now, the following
82    /// limitation applies:
83    ///
84    /// * The full content of `source` is copied into memory.
85    async fn upload_from_file<F>(
86        &self,
87        source: F,
88        target: PutObjectRequest,
89    ) -> S3ExtResult<PutObjectOutput>
90    where
91        F: AsRef<Path> + Send + Sync;
92
93    /// Upload content of file to S3 using multi-part upload
94    ///
95    /// # Caveats
96    ///
97    /// The current implementation is incomplete. For now, the following
98    /// limitation applies:
99    ///
100    /// * The full content of a part is copied into memory.
101    async fn upload_from_file_multipart<F>(
102        &self,
103        source: F,
104        target: PutObjectRequest,
105        part_size: usize,
106    ) -> S3ExtResult<CompleteMultipartUploadOutput>
107    where
108        F: AsRef<Path> + Send + Sync;
109
110    /// Get object and write it to `target`
111    async fn download<W>(
112        &self,
113        source: GetObjectRequest,
114        target: &mut W,
115    ) -> S3ExtResult<GetObjectOutput>
116    where
117        W: io::AsyncWrite + Unpin + Send;
118
119    /// Read `source` and upload it to S3
120    ///
121    /// # Caveats
122    ///
123    /// The current implementation is incomplete. For now, the following
124    /// limitation applies:
125    ///
126    /// * The full content of `source` is copied into memory.
127    async fn upload<R>(
128        &self,
129        source: &mut R,
130        target: PutObjectRequest,
131    ) -> S3ExtResult<PutObjectOutput>
132    where
133        R: io::AsyncRead + Unpin + Send;
134
135    /// Read `source` and upload it to S3 using multi-part upload
136    ///
137    /// # Caveats
138    ///
139    /// The current implementation is incomplete. For now, the following
140    /// limitation applies:
141    ///
142    /// * The full content of a part is copied into memory.
143    async fn upload_multipart<R>(
144        &self,
145        source: &mut R,
146        target: PutObjectRequest,
147        part_size: usize,
148    ) -> S3ExtResult<CompleteMultipartUploadOutput>
149    where
150        R: io::AsyncRead + Unpin + Send;
151
152    /// Stream over all objects
153    /// Access to an iterator-like object `ObjectIter` can be obtained by
154    /// calling into_iter()
155    ///
156    /// Objects are lexicographically sorted by their key.
157    fn stream_objects(&self, bucket: impl Into<String>) -> ObjectStream;
158
159    /// Stream over objects with given `prefix`
160    ///
161    /// Objects are lexicographically sorted by their key.
162    fn stream_objects_with_prefix(
163        &self,
164        bucket: impl Into<String>,
165        prefix: impl Into<String>,
166    ) -> ObjectStream;
167
168    /// Stream over all objects; fetching objects as needed
169    ///
170    /// Objects are lexicographically sorted by their key.
171    fn stream_get_objects(&self, bucket: impl Into<String>) -> GetObjectStream;
172
173    /// Stream over objects with given `prefix`; fetching objects as needed
174    ///
175    /// Objects are lexicographically sorted by their key.
176    fn stream_get_objects_with_prefix(
177        &self,
178        bucket: impl Into<String>,
179        prefix: impl Into<String>,
180    ) -> GetObjectStream;
181}
182
183#[async_trait]
184impl S3Ext for S3Client {
185    async fn download_to_file<F>(
186        &self,
187        source: GetObjectRequest,
188        target: F,
189    ) -> Result<GetObjectOutput, S3ExtError>
190    where
191        F: AsRef<Path> + Send + Sync,
192    {
193        debug!("downloading to file {:?}", target.as_ref());
194        let mut resp = self.get_object(source).await?;
195        let body = resp.body.take().expect("no body");
196        let mut target = OpenOptions::new()
197            .write(true)
198            .create_new(true)
199            .open(target)
200            .await?;
201        copy(body, &mut target).await?;
202        Ok(resp)
203    }
204
205    #[inline]
206    async fn upload_from_file<F>(
207        &self,
208        source: F,
209        target: PutObjectRequest,
210    ) -> S3ExtResult<PutObjectOutput>
211    where
212        F: AsRef<Path> + Send + Sync,
213    {
214        debug!("uploading file {:?}", source.as_ref());
215        let mut source = File::open(source).await?;
216        upload::upload(self, &mut source, target).await
217    }
218
219    #[inline]
220    async fn upload_from_file_multipart<F>(
221        &self,
222        source: F,
223        target: PutObjectRequest,
224        part_size: usize,
225    ) -> S3ExtResult<CompleteMultipartUploadOutput>
226    where
227        F: AsRef<Path> + Send + Sync,
228    {
229        debug!("uploading file {:?}", source.as_ref());
230        let mut source = File::open(source).await?;
231        upload::upload_multipart(self, &mut source, target, part_size).await
232    }
233
234    async fn download<W>(
235        &self,
236        source: GetObjectRequest,
237        mut target: &mut W,
238    ) -> S3ExtResult<GetObjectOutput>
239    where
240        W: io::AsyncWrite + Unpin + Send,
241    {
242        let mut resp = self.get_object(source).await?;
243        let body = resp.body.take().expect("no body");
244        copy(body, &mut target).await?;
245        Ok(resp)
246    }
247
248    #[inline]
249    async fn upload<R>(
250        &self,
251        source: &mut R,
252        target: PutObjectRequest,
253    ) -> S3ExtResult<PutObjectOutput>
254    where
255        R: io::AsyncRead + Unpin + Send,
256    {
257        upload::upload(self, source, target).await
258    }
259
260    #[inline]
261    async fn upload_multipart<R>(
262        &self,
263        mut source: &mut R,
264        target: PutObjectRequest,
265        part_size: usize,
266    ) -> S3ExtResult<CompleteMultipartUploadOutput>
267    where
268        R: io::AsyncRead + Unpin + Send,
269    {
270        upload::upload_multipart(self, &mut source, target, part_size).await
271    }
272
273    #[inline]
274    fn stream_objects(&self, bucket: impl Into<String>) -> ObjectStream {
275        ObjectStream::new(self, bucket, None as Option<&str>)
276    }
277
278    #[inline]
279    fn stream_objects_with_prefix(
280        &self,
281        bucket: impl Into<String>,
282        prefix: impl Into<String>,
283    ) -> ObjectStream {
284        ObjectStream::new(self, bucket, Some(prefix))
285    }
286
287    #[inline]
288    fn stream_get_objects(&self, bucket: impl Into<String>) -> GetObjectStream {
289        GetObjectStream::new(self, bucket, None as Option<&str>)
290    }
291
292    #[inline]
293    fn stream_get_objects_with_prefix(
294        &self,
295        bucket: impl Into<String>,
296        prefix: impl Into<String>,
297    ) -> GetObjectStream {
298        GetObjectStream::new(self, bucket, Some(prefix))
299    }
300}
301
302async fn copy<W>(src: StreamingBody, dest: &mut W) -> S3ExtResult<()>
303where
304    W: io::AsyncWrite + Unpin + Send,
305{
306    io::copy(&mut src.into_async_read(), dest).await?;
307    Ok(())
308}