1use std::pin::Pin;
5use std::{fmt::Debug, io::Error};
6
7use anyhow::Result;
9use aws_sdk_s3::{
10 operation::{get_object::GetObjectError, list_objects_v2::ListObjectsV2Error},
11 primitives::ByteStream,
12 types::Object,
13};
14use aws_smithy_async::future::pagination_stream::{PaginationStream, TryFlatMap};
15use bytes::Bytes;
16use futures::{
17 stream::{IntoAsyncRead, Stream},
18 task::{Context, Poll},
19 TryStreamExt,
20};
21
22use crate::types::SdkError;
24
25pub use aws_sdk_s3::Client;
28
29mod async_multipart_put_object;
30mod async_put_object;
31mod multipartcopy;
32mod s3_object;
33pub use async_multipart_put_object::AsyncMultipartUpload;
34pub use async_put_object::AsyncPutObject;
35pub use multipartcopy::{PartSize, S3MultipartCopier, S3MultipartCopierError};
36pub use s3_object::S3Object;
37pub use s3_object::S3ObjectError;
38
39#[derive(Debug, Default)]
51pub struct FuturesStreamCompatByteStream(ByteStream);
52
53impl From<ByteStream> for FuturesStreamCompatByteStream {
54 fn from(value: ByteStream) -> Self {
55 FuturesStreamCompatByteStream(value)
56 }
57}
58
59impl Stream for FuturesStreamCompatByteStream {
60 type Item = Result<Bytes, Error>;
61
62 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 Pin::new(&mut self.0)
64 .poll_next(cx)
65 .map_err(std::io::Error::other)
66 }
67}
68
69struct FuturesPaginiationStream<I>(PaginationStream<I>);
85
86impl<I> From<PaginationStream<I>> for FuturesPaginiationStream<I> {
87 fn from(value: PaginationStream<I>) -> Self {
88 FuturesPaginiationStream(value)
89 }
90}
91
92impl<I> Stream for FuturesPaginiationStream<I> {
93 type Item = I;
94
95 fn poll_next(
96 mut self: std::pin::Pin<&mut Self>,
97 cx: &mut std::task::Context<'_>,
98 ) -> std::task::Poll<Option<Self::Item>> {
99 Pin::new(&mut self.0).poll_next(cx)
100 }
101}
102
103pub fn list_objects(
128 client: &Client,
129 bucket: impl Into<String>,
130 prefix: Option<String>,
131) -> impl Stream<Item = Result<Object, SdkError<ListObjectsV2Error>>> + Unpin {
132 let req = client
133 .list_objects_v2()
134 .bucket(bucket)
135 .set_prefix(prefix)
136 .into_paginator();
137 let flatend_stream = TryFlatMap::new(req.send()).flat_map(|x| x.contents.unwrap_or_default());
138 FuturesPaginiationStream::from(flatend_stream)
139}
140
141pub type S3AsyncBufReader = IntoAsyncRead<FuturesStreamCompatByteStream>;
143
144pub async fn get_object(
164 client: &Client,
165 bucket: &str,
166 key: &str,
167) -> Result<S3AsyncBufReader, SdkError<GetObjectError>> {
168 let req = client.get_object().bucket(bucket).key(key);
169 let resp = req.send().await?;
170 Ok(FuturesStreamCompatByteStream::from(resp.body).into_async_read())
171}
172
173#[cfg(test)]
174mod test {
175 use super::*;
176 use crate::s3::S3Object;
177 use anyhow::Result;
178 use aws_sdk_s3::{
179 operation::create_bucket::CreateBucketError,
180 types::{BucketLocationConstraint, CreateBucketConfiguration},
181 Client,
182 };
183 use rand::distr::{Alphanumeric, SampleString};
184 use rand::Rng;
185 use rand::SeedableRng;
186 use rand_chacha::ChaCha8Rng;
187 use std::collections::hash_map::DefaultHasher;
188 use std::hash::{Hash, Hasher};
189
190 pub async fn create_bucket(client: &Client, bucket: &str) -> Result<()> {
191 let constraint = CreateBucketConfiguration::builder()
192 .location_constraint(BucketLocationConstraint::ApSoutheast2)
193 .build();
194 match client
195 .create_bucket()
196 .bucket(bucket)
197 .create_bucket_configuration(constraint)
198 .send()
199 .await
200 {
201 Ok(_) => Ok::<(), anyhow::Error>(()),
202 Err(e) => match e {
203 SdkError::ServiceError(ref context) => match context.err() {
204 CreateBucketError::BucketAlreadyOwnedByYou(_) => Ok::<(), anyhow::Error>(()),
205 _ => Err(anyhow::Error::from(e)),
206 },
207 e => Err(anyhow::Error::from(e)),
208 },
209 }
210 }
211
212 pub fn seeded_rng<H: Hash + ?Sized>(seed: &H) -> impl Rng {
213 let mut hasher = DefaultHasher::new();
214 seed.hash(&mut hasher);
215 ChaCha8Rng::seed_from_u64(hasher.finish())
216 }
217
218 pub fn gen_random_file_name<R: Rng>(rng: &mut R) -> String {
219 Alphanumeric.sample_string(rng, 16)
220 }
221
222 pub async fn fetch_bytes(client: &Client, obj: &S3Object) -> Result<Vec<u8>> {
223 Ok(client
224 .get_object()
225 .bucket(&obj.bucket)
226 .key(&obj.key)
227 .send()
228 .await
229 .expect("Expected dst key to exist")
230 .body
231 .collect()
232 .await
233 .expect("Expected a body")
234 .into_bytes()
235 .into_iter()
236 .collect())
237 }
238}
239
240#[cfg(test)]
241mod test_list_objects {
242 use super::*;
243 use crate::localstack;
244 use futures::TryStreamExt;
245 use serial_test::serial;
246 use std::error::Error;
247 use tokio;
248
249 async fn localstack_test_client() -> Client {
250 localstack::test_utils::wait_for_localstack().await;
251 let shared_config = crate::config::load_from_env().await.unwrap();
252 let builder = aws_sdk_s3::config::Builder::from(&shared_config)
253 .force_path_style(true)
254 .build();
255 Client::from_conf(builder)
256 }
257
258 #[tokio::test]
259 #[serial]
260 async fn test_non_existent_bucket() {
261 let client = localstack_test_client().await;
262
263 let stream = list_objects(&client, "non-existent-bucket", None);
264 let e = stream.try_collect::<Vec<_>>().await.unwrap_err();
265 assert!(matches!(
266 e.source()
267 .unwrap()
268 .downcast_ref::<ListObjectsV2Error>()
269 .unwrap(),
270 ListObjectsV2Error::NoSuchBucket(_)
271 ))
272 }
273
274 #[tokio::test]
275 #[serial]
276 async fn test_empty_bucket() {
277 let client = localstack_test_client().await;
278
279 let stream = list_objects(&client, "empty-bucket", None);
280 let results = stream.try_collect::<Vec<_>>().await.unwrap();
281 assert_eq!(results, vec![]);
282 }
283
284 #[tokio::test]
285 #[serial]
286 async fn test_no_prefix() {
287 let client = localstack_test_client().await;
288
289 let stream = list_objects(&client, "test-bucket", None);
290 let mut results = stream.try_collect::<Vec<_>>().await.unwrap();
291 results.sort_by_cached_key(|x| x.size);
292 assert_eq!(results.len(), 2503);
293 }
294
295 #[tokio::test]
296 #[serial]
297 async fn test_with_prefix() {
298 let client = localstack_test_client().await;
299
300 let stream = list_objects(&client, "test-bucket", Some("some-prefix".into()));
301 let results = stream.try_collect::<Vec<_>>().await.unwrap();
302 assert_eq!(results.len(), 2);
303 assert_eq!(
304 results[0].key,
305 Some("some-prefix/nested-prefix/nested.txt".into())
306 );
307 assert_eq!(results[0].size, Some(12));
308 assert_eq!(results[1].key, Some("some-prefix/prefixed.txt".into()));
309 assert_eq!(results[1].size, Some(14));
310 }
311
312 #[tokio::test]
313 #[serial]
314 async fn test_with_prefix_slash() {
315 let client = localstack_test_client().await;
316
317 let stream = list_objects(&client, "test-bucket", Some("some-prefix/".into()));
318 let results = stream.try_collect::<Vec<_>>().await.unwrap();
319 assert_eq!(results.len(), 2);
320 assert_eq!(
321 results[0].key,
322 Some("some-prefix/nested-prefix/nested.txt".into())
323 );
324 assert_eq!(results[0].size, Some(12));
325 assert_eq!(results[1].key, Some("some-prefix/prefixed.txt".into()));
326 assert_eq!(results[1].size, Some(14));
327 }
328
329 #[tokio::test]
330 #[serial]
331 async fn test_with_nested_prefix() {
332 let client = localstack_test_client().await;
333
334 let stream = list_objects(
335 &client,
336 "test-bucket",
337 Some("some-prefix/nested-prefix".into()),
338 );
339 let results = stream.try_collect::<Vec<_>>().await.unwrap();
340 assert_eq!(results.len(), 1);
341 assert_eq!(
342 results[0].key,
343 Some("some-prefix/nested-prefix/nested.txt".into())
344 );
345 assert_eq!(results[0].size, Some(12));
346 }
347
348 #[tokio::test]
349 #[serial]
350 async fn test_with_partial_prefix() {
351 let client = localstack_test_client().await;
352
353 let stream = list_objects(&client, "test-bucket", Some("empty-pre".into()));
354 let results = stream.try_collect::<Vec<_>>().await.unwrap();
355 assert_eq!(results.len(), 0);
356 }
357
358 #[tokio::test]
359 #[serial]
360 async fn test_with_empty_prefix() {
361 let client = localstack_test_client().await;
362
363 let stream = list_objects(&client, "test-bucket", Some("empty-prefix".into()));
364 let results = stream.try_collect::<Vec<_>>().await.unwrap();
365 assert_eq!(results.len(), 0);
366 }
367
368 #[tokio::test]
369 #[serial]
370 async fn test_with_multiple_pages() {
371 let client = localstack_test_client().await;
372
373 let stream = list_objects(&client, "test-bucket", Some("multi-page".into()));
374 let results = stream.try_collect::<Vec<_>>().await.unwrap();
375 assert_eq!(results.len(), 2500);
376 }
377}
378#[cfg(test)]
379mod test_get_object {
380 use super::*;
381 use crate::localstack;
382 use aws_sdk_s3::error::ProvideErrorMetadata;
383 use futures::AsyncReadExt;
384 use serial_test::serial;
385 use std::error::Error;
386 use tokio;
387
388 async fn localstack_test_client() -> Client {
389 localstack::test_utils::wait_for_localstack().await;
390 let shared_config = crate::config::load_from_env().await.unwrap();
391 let builder = aws_sdk_s3::config::Builder::from(&shared_config)
392 .force_path_style(true)
393 .build();
394 Client::from_conf(builder)
395 }
396
397 #[tokio::test]
398 #[serial]
399 async fn test_non_existent_bucket() {
400 let client = localstack_test_client().await;
401 match get_object(&client, "non-existent-bucket", "my-object").await {
402 Ok(_) => panic!("Expected an error, but got Ok"),
403 Err(e) => {
404 let e = e
405 .source()
406 .unwrap()
407 .downcast_ref::<GetObjectError>()
408 .unwrap();
409
410 assert_eq!(e.code(), Some("NoSuchBucket"));
411 }
412 }
413 }
414
415 #[tokio::test]
416 #[serial]
417 async fn test_non_existent_key() {
418 let client = localstack_test_client().await;
419 match get_object(&client, "test-bucket", "non-existing-object").await {
420 Ok(_) => panic!("Expected an error, but got Ok"),
421 Err(e) => {
422 let e = e
423 .source()
424 .unwrap()
425 .downcast_ref::<GetObjectError>()
426 .unwrap();
427
428 assert!(matches!(e, GetObjectError::NoSuchKey(_)));
429 }
430 }
431 }
432
433 #[tokio::test]
434 #[serial]
435 async fn test_existing_key() {
436 let client = localstack_test_client().await;
437 let mut reader = get_object(&client, "test-bucket", "test.txt")
438 .await
439 .unwrap();
440 let mut buffer = String::new();
441 let bytes = reader.read_to_string(&mut buffer).await.unwrap();
442 assert_eq!(buffer, "test data\n");
443 assert_eq!(bytes, 10);
444 }
445}