cobalt_aws/s3/
mod.rs

1//! A collection of wrappers around the [aws_sdk_s3](https://docs.rs/aws-sdk-s3/latest/aws_sdk_s3/) crate.
2
3// Standard library imports
4use std::pin::Pin;
5use std::{fmt::Debug, io::Error};
6
7// External crates
8use anyhow::Result;
9use aws_sdk_s3::{
10    operation::{get_object::GetObjectError, list_objects_v2::ListObjectsV2Error},
11    primitives::ByteStream,
12    types::Object,
13};
14use aws_smithy_async::future::pagination_stream::{PaginationStream, TryFlatMap};
15use bytes::Bytes;
16use futures::{
17    stream::{IntoAsyncRead, Stream},
18    task::{Context, Poll},
19    TryStreamExt,
20};
21
22// Internal project imports
23use crate::types::SdkError;
24
25/// Re-export of [aws_sdk_s3::client::Client](https://docs.rs/aws-sdk-s3/latest/aws_sdk_s3/client/struct.Client.html).
26///
27pub use aws_sdk_s3::Client;
28
29mod async_multipart_put_object;
30mod async_put_object;
31mod multipartcopy;
32mod s3_object;
33pub use async_multipart_put_object::AsyncMultipartUpload;
34pub use async_put_object::AsyncPutObject;
35pub use multipartcopy::{PartSize, S3MultipartCopier, S3MultipartCopierError};
36pub use s3_object::S3Object;
37pub use s3_object::S3ObjectError;
38
39/// `FuturesStreamCompatByteStream` is a compatibility layer struct designed to wrap
40/// `ByteStream` from the `aws_sdk_s3`. This wrapper enables the use of `ByteStream`
41/// with the `futures::Stream` trait, which is necessary for integration with libraries
42/// that rely on the futures crate, such as `cobalt-aws`.
43///
44/// # Why
45/// The `aws_sdk_s3` uses Tokio's async model and exposes streams (such as `ByteStream`)
46/// that are specific to Tokio's ecosystem. However, the `cobalt-aws` library operates
47/// on the futures crate's async model. `FuturesStreamCompatByteStream` bridges this gap,
48/// allowing `ByteStream` to be used where a `futures::Stream` is required, ensuring
49/// compatibility and interoperability between these two different async ecosystems.
50#[derive(Debug, Default)]
51pub struct FuturesStreamCompatByteStream(ByteStream);
52
53impl From<ByteStream> for FuturesStreamCompatByteStream {
54    fn from(value: ByteStream) -> Self {
55        FuturesStreamCompatByteStream(value)
56    }
57}
58
59impl Stream for FuturesStreamCompatByteStream {
60    type Item = Result<Bytes, Error>;
61
62    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        Pin::new(&mut self.0)
64            .poll_next(cx)
65            .map_err(std::io::Error::other)
66    }
67}
68
69/// `FuturesPaginationStream` is a struct that wraps the `PaginationStream` from
70/// `aws_smithy_async::future::pagination_stream`, adapting it to implement the `Stream`
71/// trait from the `futures` crate.
72///
73/// # Why
74/// `PaginationStream` in `aws_smithy_async` is designed to be runtime-agnostic and
75/// does not natively implement the `futures::Stream` trait. `FuturesPaginationStream`
76/// provides this implementation, making `PaginationStream` compatible with the futures-based
77/// asynchronous model used in libraries like `cobalt-aws`.
78///
79/// This adaptation is essential in scenarios where `cobalt-aws`, which relies on the
80/// `futures` crate, needs to work with the AWS SDK's pagination streams. It bridges
81/// the gap between different async runtimes and libraries, ensuring smoother integration
82/// and functionality in Rust async applications that rely on the futures ecosystem.
83///
84struct FuturesPaginiationStream<I>(PaginationStream<I>);
85
86impl<I> From<PaginationStream<I>> for FuturesPaginiationStream<I> {
87    fn from(value: PaginationStream<I>) -> Self {
88        FuturesPaginiationStream(value)
89    }
90}
91
92impl<I> Stream for FuturesPaginiationStream<I> {
93    type Item = I;
94
95    fn poll_next(
96        mut self: std::pin::Pin<&mut Self>,
97        cx: &mut std::task::Context<'_>,
98    ) -> std::task::Poll<Option<Self::Item>> {
99        Pin::new(&mut self.0).poll_next(cx)
100    }
101}
102
103/// Perform a bucket listing, returning a stream of results.
104///
105/// # Example
106///
107/// ```no_run
108/// use aws_config;
109/// use cobalt_aws::s3::{Client, list_objects};
110/// use cobalt_aws::config::load_from_env;
111/// use futures::TryStreamExt;
112///
113/// # tokio_test::block_on(async {
114/// let shared_config = load_from_env().await.unwrap();
115/// let client = Client::new(&shared_config);
116/// let mut objects = list_objects(&client, "my-bucket", Some("prefix".into()));
117/// while let Some(item) = objects.try_next().await.unwrap() {
118///     println!("{:?}", item);
119/// }
120/// # })
121/// ```
122///
123/// # Implementation details
124///
125/// This function uses the [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
126/// API and performs pagination to ensure all objects are returned.
127pub fn list_objects(
128    client: &Client,
129    bucket: impl Into<String>,
130    prefix: Option<String>,
131) -> impl Stream<Item = Result<Object, SdkError<ListObjectsV2Error>>> + Unpin {
132    let req = client
133        .list_objects_v2()
134        .bucket(bucket)
135        .set_prefix(prefix)
136        .into_paginator();
137    let flatend_stream = TryFlatMap::new(req.send()).flat_map(|x| x.contents.unwrap_or_default());
138    FuturesPaginiationStream::from(flatend_stream)
139}
140
141/// An `AsyncBufRead` compatible reader over an S3 object byte stream.
142pub type S3AsyncBufReader = IntoAsyncRead<FuturesStreamCompatByteStream>;
143
144/// Retrieve an object from S3 as an `AsyncBufRead`.
145///
146/// # Example
147///
148/// ```no_run
149/// use aws_config;
150/// use cobalt_aws::s3::{Client, get_object};
151/// use cobalt_aws::config::load_from_env;
152/// use futures::AsyncReadExt;
153///
154/// # tokio_test::block_on(async {
155/// let shared_config = load_from_env().await.unwrap();
156/// let client = Client::new(&shared_config);
157/// let mut reader = get_object(&client, "my-bucket", "my-key").await.unwrap();
158/// let mut buffer = String::new();
159/// reader.read_to_string(&mut buffer).await.unwrap();
160/// println!("{}", buffer);
161/// # })
162/// ```
163pub async fn get_object(
164    client: &Client,
165    bucket: &str,
166    key: &str,
167) -> Result<S3AsyncBufReader, SdkError<GetObjectError>> {
168    let req = client.get_object().bucket(bucket).key(key);
169    let resp = req.send().await?;
170    Ok(FuturesStreamCompatByteStream::from(resp.body).into_async_read())
171}
172
173#[cfg(test)]
174mod test {
175    use super::*;
176    use crate::s3::S3Object;
177    use anyhow::Result;
178    use aws_sdk_s3::{
179        operation::create_bucket::CreateBucketError,
180        types::{BucketLocationConstraint, CreateBucketConfiguration},
181        Client,
182    };
183    use rand::distr::{Alphanumeric, SampleString};
184    use rand::Rng;
185    use rand::SeedableRng;
186    use rand_chacha::ChaCha8Rng;
187    use std::collections::hash_map::DefaultHasher;
188    use std::hash::{Hash, Hasher};
189
190    pub async fn create_bucket(client: &Client, bucket: &str) -> Result<()> {
191        let constraint = CreateBucketConfiguration::builder()
192            .location_constraint(BucketLocationConstraint::ApSoutheast2)
193            .build();
194        match client
195            .create_bucket()
196            .bucket(bucket)
197            .create_bucket_configuration(constraint)
198            .send()
199            .await
200        {
201            Ok(_) => Ok::<(), anyhow::Error>(()),
202            Err(e) => match e {
203                SdkError::ServiceError(ref context) => match context.err() {
204                    CreateBucketError::BucketAlreadyOwnedByYou(_) => Ok::<(), anyhow::Error>(()),
205                    _ => Err(anyhow::Error::from(e)),
206                },
207                e => Err(anyhow::Error::from(e)),
208            },
209        }
210    }
211
212    pub fn seeded_rng<H: Hash + ?Sized>(seed: &H) -> impl Rng {
213        let mut hasher = DefaultHasher::new();
214        seed.hash(&mut hasher);
215        ChaCha8Rng::seed_from_u64(hasher.finish())
216    }
217
218    pub fn gen_random_file_name<R: Rng>(rng: &mut R) -> String {
219        Alphanumeric.sample_string(rng, 16)
220    }
221
222    pub async fn fetch_bytes(client: &Client, obj: &S3Object) -> Result<Vec<u8>> {
223        Ok(client
224            .get_object()
225            .bucket(&obj.bucket)
226            .key(&obj.key)
227            .send()
228            .await
229            .expect("Expected dst key to exist")
230            .body
231            .collect()
232            .await
233            .expect("Expected a body")
234            .into_bytes()
235            .into_iter()
236            .collect())
237    }
238}
239
240#[cfg(test)]
241mod test_list_objects {
242    use super::*;
243    use crate::localstack;
244    use futures::TryStreamExt;
245    use serial_test::serial;
246    use std::error::Error;
247    use tokio;
248
249    async fn localstack_test_client() -> Client {
250        localstack::test_utils::wait_for_localstack().await;
251        let shared_config = crate::config::load_from_env().await.unwrap();
252        let builder = aws_sdk_s3::config::Builder::from(&shared_config)
253            .force_path_style(true)
254            .build();
255        Client::from_conf(builder)
256    }
257
258    #[tokio::test]
259    #[serial]
260    async fn test_non_existent_bucket() {
261        let client = localstack_test_client().await;
262
263        let stream = list_objects(&client, "non-existent-bucket", None);
264        let e = stream.try_collect::<Vec<_>>().await.unwrap_err();
265        assert!(matches!(
266            e.source()
267                .unwrap()
268                .downcast_ref::<ListObjectsV2Error>()
269                .unwrap(),
270            ListObjectsV2Error::NoSuchBucket(_)
271        ))
272    }
273
274    #[tokio::test]
275    #[serial]
276    async fn test_empty_bucket() {
277        let client = localstack_test_client().await;
278
279        let stream = list_objects(&client, "empty-bucket", None);
280        let results = stream.try_collect::<Vec<_>>().await.unwrap();
281        assert_eq!(results, vec![]);
282    }
283
284    #[tokio::test]
285    #[serial]
286    async fn test_no_prefix() {
287        let client = localstack_test_client().await;
288
289        let stream = list_objects(&client, "test-bucket", None);
290        let mut results = stream.try_collect::<Vec<_>>().await.unwrap();
291        results.sort_by_cached_key(|x| x.size);
292        assert_eq!(results.len(), 2503);
293    }
294
295    #[tokio::test]
296    #[serial]
297    async fn test_with_prefix() {
298        let client = localstack_test_client().await;
299
300        let stream = list_objects(&client, "test-bucket", Some("some-prefix".into()));
301        let results = stream.try_collect::<Vec<_>>().await.unwrap();
302        assert_eq!(results.len(), 2);
303        assert_eq!(
304            results[0].key,
305            Some("some-prefix/nested-prefix/nested.txt".into())
306        );
307        assert_eq!(results[0].size, Some(12));
308        assert_eq!(results[1].key, Some("some-prefix/prefixed.txt".into()));
309        assert_eq!(results[1].size, Some(14));
310    }
311
312    #[tokio::test]
313    #[serial]
314    async fn test_with_prefix_slash() {
315        let client = localstack_test_client().await;
316
317        let stream = list_objects(&client, "test-bucket", Some("some-prefix/".into()));
318        let results = stream.try_collect::<Vec<_>>().await.unwrap();
319        assert_eq!(results.len(), 2);
320        assert_eq!(
321            results[0].key,
322            Some("some-prefix/nested-prefix/nested.txt".into())
323        );
324        assert_eq!(results[0].size, Some(12));
325        assert_eq!(results[1].key, Some("some-prefix/prefixed.txt".into()));
326        assert_eq!(results[1].size, Some(14));
327    }
328
329    #[tokio::test]
330    #[serial]
331    async fn test_with_nested_prefix() {
332        let client = localstack_test_client().await;
333
334        let stream = list_objects(
335            &client,
336            "test-bucket",
337            Some("some-prefix/nested-prefix".into()),
338        );
339        let results = stream.try_collect::<Vec<_>>().await.unwrap();
340        assert_eq!(results.len(), 1);
341        assert_eq!(
342            results[0].key,
343            Some("some-prefix/nested-prefix/nested.txt".into())
344        );
345        assert_eq!(results[0].size, Some(12));
346    }
347
348    #[tokio::test]
349    #[serial]
350    async fn test_with_partial_prefix() {
351        let client = localstack_test_client().await;
352
353        let stream = list_objects(&client, "test-bucket", Some("empty-pre".into()));
354        let results = stream.try_collect::<Vec<_>>().await.unwrap();
355        assert_eq!(results.len(), 0);
356    }
357
358    #[tokio::test]
359    #[serial]
360    async fn test_with_empty_prefix() {
361        let client = localstack_test_client().await;
362
363        let stream = list_objects(&client, "test-bucket", Some("empty-prefix".into()));
364        let results = stream.try_collect::<Vec<_>>().await.unwrap();
365        assert_eq!(results.len(), 0);
366    }
367
368    #[tokio::test]
369    #[serial]
370    async fn test_with_multiple_pages() {
371        let client = localstack_test_client().await;
372
373        let stream = list_objects(&client, "test-bucket", Some("multi-page".into()));
374        let results = stream.try_collect::<Vec<_>>().await.unwrap();
375        assert_eq!(results.len(), 2500);
376    }
377}
378#[cfg(test)]
379mod test_get_object {
380    use super::*;
381    use crate::localstack;
382    use aws_sdk_s3::error::ProvideErrorMetadata;
383    use futures::AsyncReadExt;
384    use serial_test::serial;
385    use std::error::Error;
386    use tokio;
387
388    async fn localstack_test_client() -> Client {
389        localstack::test_utils::wait_for_localstack().await;
390        let shared_config = crate::config::load_from_env().await.unwrap();
391        let builder = aws_sdk_s3::config::Builder::from(&shared_config)
392            .force_path_style(true)
393            .build();
394        Client::from_conf(builder)
395    }
396
397    #[tokio::test]
398    #[serial]
399    async fn test_non_existent_bucket() {
400        let client = localstack_test_client().await;
401        match get_object(&client, "non-existent-bucket", "my-object").await {
402            Ok(_) => panic!("Expected an error, but got Ok"),
403            Err(e) => {
404                let e = e
405                    .source()
406                    .unwrap()
407                    .downcast_ref::<GetObjectError>()
408                    .unwrap();
409
410                assert_eq!(e.code(), Some("NoSuchBucket"));
411            }
412        }
413    }
414
415    #[tokio::test]
416    #[serial]
417    async fn test_non_existent_key() {
418        let client = localstack_test_client().await;
419        match get_object(&client, "test-bucket", "non-existing-object").await {
420            Ok(_) => panic!("Expected an error, but got Ok"),
421            Err(e) => {
422                let e = e
423                    .source()
424                    .unwrap()
425                    .downcast_ref::<GetObjectError>()
426                    .unwrap();
427
428                assert!(matches!(e, GetObjectError::NoSuchKey(_)));
429            }
430        }
431    }
432
433    #[tokio::test]
434    #[serial]
435    async fn test_existing_key() {
436        let client = localstack_test_client().await;
437        let mut reader = get_object(&client, "test-bucket", "test.txt")
438            .await
439            .unwrap();
440        let mut buffer = String::new();
441        let bytes = reader.read_to_string(&mut buffer).await.unwrap();
442        assert_eq!(buffer, "test data\n");
443        assert_eq!(bytes, 10);
444    }
445}