s3_algo/list_actions.rs
1use super::*;
2use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;
3use aws_sdk_s3::primitives::ByteStream;
4use aws_sdk_s3::types::{Delete, Object, ObjectIdentifier};
5use aws_smithy_types_convert::stream::PaginationStreamExt;
6use futures::future::ok;
7use futures::stream::Stream;
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io;
12
13/// A stream that can list objects, and (using member functions) delete or copy listed files.
14pub struct ListObjects<S> {
15 s3: Client,
16 config: Config,
17 bucket: String,
18 /// Common prefix (as requested) of the listed objects. Empty string if all objects were
19 /// requestd.
20 prefix: String,
21 stream: S,
22}
23impl<S> ListObjects<S>
24where
25 S: Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send + 'static,
26{
27 pub fn boxed(
28 self,
29 ) -> ListObjects<Pin<Box<dyn Stream<Item = Result<ListObjectsV2Output, Error>> + Send>>> {
30 ListObjects {
31 s3: self.s3,
32 config: self.config,
33 bucket: self.bucket,
34 stream: self.stream.boxed(),
35 prefix: self.prefix,
36 }
37 }
38
39 /// Calls an async closure on all the individual objects of the list operation
40 pub async fn process<P, F>(self, f: P) -> Result<(), Error>
41 where
42 P: Fn(Object) -> F + Clone,
43 F: Future<Output = ()>,
44 {
45 let ListObjects {
46 stream, prefix: _, ..
47 } = self;
48 stream
49 .try_filter_map(|response| ok(response.contents))
50 .map_ok(|x| stream::iter(x).map(Ok))
51 .try_flatten()
52 .try_for_each_concurrent(None, move |object| {
53 let f = f.clone();
54 async move {
55 f(object).await;
56 Ok(())
57 }
58 })
59 .await
60 }
61 /// Download all listed objects - returns a stream of the contents.
62 /// Used as a basis for other `download_all_*` functions.
63 pub fn download_all_stream(
64 self,
65 ) -> impl Stream<Item = Result<(String, ByteStream, Option<i64>), Error>> {
66 let ListObjects {
67 s3,
68 config: _,
69 bucket,
70 stream,
71 prefix: _,
72 } = self;
73 stream
74 .try_filter_map(|response| ok(response.contents))
75 .map_ok(|x| stream::iter(x).map(Ok))
76 .try_flatten()
77 .map(|result| {
78 result.and_then(|obj| {
79 let Object { key, size, .. } = obj;
80 if let Some(key) = key {
81 Ok((key, size))
82 } else {
83 Err(Error::MissingKeyOrSize)
84 }
85 })
86 })
87 .and_then(move |(key, _)| {
88 let (s3, bucket) = (s3.clone(), bucket.clone());
89
90 async move {
91 let output = s3
92 .get_object()
93 .bucket(bucket.clone())
94 .key(key.clone())
95 .send()
96 .await
97 .context(err::GetObject {
98 key: key.clone(),
99 bucket,
100 })?;
101 Ok((key, output.body, output.content_length))
102 }
103 })
104 }
105
106 pub fn download_all_to_vec(self) -> impl Stream<Item = Result<(String, Vec<u8>), Error>> {
107 self.download_all_stream()
108 .and_then(|(key, body, _)| async move {
109 let mut contents = vec![];
110 io::copy(&mut body.into_async_read(), &mut contents)
111 .await
112 .context(err::TokioIo)?;
113 Ok((key, contents))
114 })
115 }
116
117 /*
118 /// Download all listed objects to file system.
119 /// UNIMPLEMENTED.
120 pub fn download_all(self) -> impl Future<Output = Result<(), Error>> {
121 // TODO use download_all_stream
122 ok(unimplemented!())
123 }
124 */
125
126 /// Delete all listed objects.
127 ///
128 /// With the two arguments, you can implement a detailed real-time progress report of both how
129 /// many files have been listed, and how many files have been deleted.
130 ///
131 /// `list_progress`: Closure that is given number of files listed as argument. Is called
132 /// several times, one for each batch of files listed.
133 /// `delete_progress`: Closure that is given RequestReport of a delete request. The `size`
134 /// field refers to the number of fields deleted.
135 ///
136 pub fn delete_all<P1, P2, F1, F2>(
137 self,
138 list_progress: P1,
139 delete_progress: P2,
140 ) -> impl Future<Output = Result<(), Error>>
141 where
142 P1: Fn(usize) -> F1 + Clone + Send + Sync + 'static,
143 P2: Fn(RequestReport) -> F2 + Clone + Send + Sync + 'static,
144 F1: Future<Output = ()> + Send + 'static,
145 F2: Future<Output = ()> + Send + 'static,
146 {
147 // For each ListObjectsV2Output, send a request to delete all the listed objects
148 let ListObjects {
149 s3,
150 config,
151 bucket,
152 stream,
153 prefix: _,
154 } = self;
155 let timeout = Arc::new(Mutex::new(TimeoutState::new(
156 config.algorithm.clone(),
157 config.delete_requests.clone(),
158 )));
159 let n_retries = config.algorithm.n_retries;
160 stream.try_for_each_concurrent(None, move |object| {
161 let (s3, bucket, timeout, delete_progress2, list_progress2) = (
162 s3.clone(),
163 bucket.clone(),
164 timeout.clone(),
165 delete_progress.clone(),
166 list_progress.clone(),
167 );
168 let objects = object
169 .contents
170 .unwrap_or_default() // unwrap or empty Vec
171 .iter()
172 .filter_map(|obj| {
173 obj.key.as_ref().map(|key| {
174 ObjectIdentifier::builder()
175 .set_key(Some(key.clone()))
176 .set_version_id(None)
177 .build()
178 .unwrap() // unwrap: shouldn't fail building as the key comes directly
179 // from S3
180 })
181 })
182 .collect::<Vec<_>>();
183 let n_objects = objects.len();
184
185 async move {
186 list_progress2(n_objects).await;
187 let (report, _) = s3_request(
188 move || {
189 let (s3, bucket, objects) = (s3.clone(), bucket.clone(), objects.clone());
190 async move {
191 let (s3, bucket, objects) =
192 (s3.clone(), bucket.clone(), objects.clone());
193 Ok((
194 async move {
195 s3.delete_objects()
196 .set_bucket(Some(bucket))
197 .set_delete(Some(
198 Delete::builder()
199 .set_objects(Some(objects))
200 .build()
201 .unwrap(), // unwrap: shouldn't fail building
202 // because all the input comes directly from S3
203 ))
204 .send()
205 .await
206 .map_err(|e| e.into())
207 },
208 n_objects,
209 ))
210 }
211 },
212 |_, size| size,
213 n_retries,
214 timeout.clone(),
215 )
216 .await?;
217 timeout.lock().await.update(&report);
218 delete_progress2(report).await;
219 Ok(())
220 }
221 })
222 }
223
224 /// Flatten into a stream of Objects.
225 pub fn flatten(self) -> impl Stream<Item = Result<Object, Error>> {
226 self.stream
227 .try_filter_map(|response| ok(response.contents))
228 .map_ok(|x| stream::iter(x).map(Ok))
229 .try_flatten()
230 }
231
232 /*
233 /// This function exists to provide a stream to copy all objects, for both `copy_all` and
234 /// `move_all`. The `String` that is the stream's `Item` is the _source key_. An `Ok` value
235 /// thus signals (relevant when used in `move_all`) that a certain key is ready for deletion.
236 fn copy_all_stream<F, R>(
237 self,
238 dest_bucket: Option<String>,
239 mapping: F,
240 default_request: R,
241 ) -> impl Stream<Item = Result<String, Error>>
242 where
243 F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
244 R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
245 {
246 let ListObjects {
247 s3,
248 config,
249 bucket,
250 stream,
251 prefix: _,
252 } = self;
253 let timeout = Arc::new(Mutex::new(TimeoutState::new(
254 config.algorithm.clone(),
255 config.put_requests.clone(),
256 )));
257 let n_retries = config.algorithm.n_retries;
258 let dest_bucket = dest_bucket.unwrap_or_else(|| bucket.clone());
259 stream
260 .try_filter_map(|response| ok(response.1.contents))
261 .map_ok(|x| stream::iter(x).map(Ok))
262 .try_flatten()
263 .try_filter_map(|obj| {
264 // Just filter out any object that does not have both of `key` and `size`
265 let Object { key, size, .. } = obj;
266 ok(key.and_then(|key| size.map(|size| (key, size))))
267 })
268 .and_then(move |(key, size)| {
269 let (s3, timeout) = (s3.clone(), timeout.clone());
270 let request = CopyObjectRequest {
271 copy_source: format!("{}/{}", bucket, key),
272 bucket: dest_bucket.clone(),
273 key: mapping(&key),
274 ..default_request()
275 };
276 // println!("COPY REQUEST\n{:#?}", request);
277 s3_request(
278 move || {
279 let (s3, request) = (s3.clone(), request.clone());
280 async move {
281 let (s3, request) = (s3.clone(), request.clone());
282 Ok((async move{s3.copy_object(request).context(err::CopyObject).await}, size as usize))
283 }
284 },
285 |_, size| size,
286 n_retries,
287 timeout,
288 )
289 .map_ok(|_| key)
290 })
291 }
292
293 /// Copy all listed objects, to a different S3 location as defined in `mapping` and
294 /// `dest_bucket`.
295 /// If `other_bucket` is not provided, copy to same bucket
296 pub fn copy_all<F, R>(
297 self,
298 dest_bucket: Option<String>,
299 mapping: F,
300 default_request: R,
301 ) -> impl Future<Output = Result<(), Error>>
302 where
303 F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
304 R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
305 {
306 self.copy_all_stream(dest_bucket, mapping, default_request)
307 .try_for_each(|_| async { Ok(()) })
308 }
309 // TODO: Is it possible to change copy_all so that we can move_all by just chaining copy_all
310 // and delete_all? Then copy_all would need to return a stream of old keys, but does that make
311 // sense in general?
312 // For now, this is code duplication.
313 pub fn move_all<F, R>(
314 self,
315 dest_bucket: Option<String>,
316 mapping: F,
317 default_request: R,
318 ) -> impl Future<Output = Result<(), Error>>
319 where
320 F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
321 R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
322 {
323 let src_bucket = self.bucket.clone();
324 let timeout = Arc::new(Mutex::new(TimeoutState::new(
325 self.config.algorithm.clone(),
326 self.config.delete_requests.clone(),
327 )));
328 let n_retries = self.config.algorithm.n_retries;
329 let s3 = self.s3.clone();
330 self.copy_all_stream(dest_bucket, mapping, default_request)
331 .and_then(move |src_key| {
332 let delete_request = DeleteObjectRequest {
333 bucket: src_bucket.clone(),
334 key: src_key,
335 ..Default::default()
336 };
337 let (s3, timeout) = (s3.clone(), timeout.clone());
338 s3_request(
339 move || {
340 let (s3, delete_request) = (s3.clone(), delete_request.clone());
341 async move {
342 let (s3, delete_request) = (s3.clone(), delete_request.clone());
343 Ok((
344 async move {
345 s3.delete_object(delete_request)
346 .context(err::DeleteObject)
347 .await
348 },
349 1,
350 ))
351 }
352 },
353 |_, _| 1,
354 n_retries,
355 timeout,
356 )
357 .map_ok(drop)
358 .boxed()
359 })
360 .try_for_each(|_| async { Ok(()) })
361 .boxed()
362 }
363 /// Move all listed objects by substituting their common prefix with `new_prefix`.
364 pub fn move_to_prefix<R>(
365 self,
366 dest_bucket: Option<String>,
367 new_prefix: String,
368 default_request: R,
369 ) -> impl Future<Output = Result<(), Error>>
370 where
371 R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
372 {
373 let old_prefix = self.prefix.clone();
374 let substitute_prefix =
375 move |source: &str| format!("{}{}", new_prefix, source.trim_start_matches(&old_prefix));
376 self.move_all(dest_bucket, substitute_prefix, default_request)
377 .boxed()
378 }
379 */
380}
381
382impl<S> Stream for ListObjects<S>
383where
384 S: Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send + Unpin,
385{
386 type Item = Result<ListObjectsV2Output, Error>;
387 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
388 Pin::new(&mut self.stream).poll_next(cx)
389 }
390}
391
392impl S3Algo {
393 /// List objects of a bucket.
394 pub fn list_prefix(
395 &self,
396 bucket: String,
397 prefix: Option<String>,
398 ) -> ListObjects<impl Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send> {
399 // TODO: Reintroduce retry and timeout
400
401 let stream = self
402 .s3
403 .list_objects_v2()
404 .bucket(bucket.clone())
405 .set_prefix(prefix)
406 .into_paginator()
407 .send();
408 let stream = PaginationStreamExt::into_stream_03x(stream)
409 // Turn into a stream of Objects
410 .map_err(|source| Error::ListObjectsV2 { source });
411
412 ListObjects {
413 s3: self.s3.clone(),
414 config: self.config.clone(),
415 stream,
416 bucket,
417 prefix: String::new(),
418 }
419 }
420}
421
422#[cfg(test)]
423mod test {
424 use super::*;
425 use crate::test::rand_string;
426 use std::sync::atomic::{AtomicUsize, Ordering};
427 #[tokio::test]
428 async fn test_s3_delete_files_progress() {
429 // Minio does paging at 10'000 fles, so we need more than that.
430 // It means this test will take a minutes or two.
431 let algo = S3Algo::new(testing_sdk_client().await);
432 let dir = rand_string(14);
433 let dir2 = dir.clone();
434 const N_FILES: usize = 11_000;
435 let files = (0..N_FILES).map(move |i| ObjectSource::Data {
436 data: vec![1, 2, 3],
437 key: format!("{}/{}.file", dir2, i),
438 });
439 algo.upload_files(
440 "test-bucket".into(),
441 files,
442 |result| async move {
443 if result.seq % 100 == 0 {
444 println!("{} files uploaded", result.seq);
445 }
446 },
447 |client| client.put_object(),
448 )
449 .await
450 .unwrap();
451
452 let listed_files = Arc::new(AtomicUsize::new(0));
453 let deleted_files = Arc::new(AtomicUsize::new(0));
454 let listed_files2 = listed_files.clone();
455 let deleted_files2 = deleted_files.clone();
456
457 // Do one listing only to check the exact file names
458 let present = Arc::new(Mutex::new(std::collections::HashSet::new()));
459 algo.list_prefix("test-bucket".into(), Some(dir.clone()))
460 .process(|object| async {
461 let name = object.key.unwrap_or_else(|| "NONE".to_string());
462 println!("OBJ {}", name);
463 present.lock().await.insert(name);
464 })
465 .await
466 .unwrap();
467 let mut present = present.lock().await;
468
469 // All files are present
470 for i in 0..N_FILES {
471 let file_name = &format!("{}/{}.file", dir, i);
472 assert!(present.remove(file_name));
473 }
474
475 // No unexpected filesnames.
476 // Because once, it listed 11_200 files instead of 11_000
477 if !present.is_empty() {
478 println!("Left-over object names: {:?}", present);
479 panic!("Not empty ({} files)", present.len());
480 }
481
482 // Assert that number of files is N_FILES
483 let count = algo
484 .list_prefix("test-bucket".into(), Some(dir.clone()))
485 .flatten()
486 .try_fold(0usize, |acc, _| ok(acc + 1))
487 .await
488 .unwrap();
489 assert_eq!(count, N_FILES);
490
491 // Delete all
492 algo.list_prefix("test-bucket".into(), Some(dir.clone()))
493 .delete_all(
494 move |n| {
495 println!("Listed {} items", n);
496 let listed_files = listed_files2.clone();
497 async move {
498 listed_files.fetch_add(n, Ordering::Relaxed);
499 }
500 },
501 move |del_rep| {
502 let n = del_rep.size as usize;
503 println!("Deleted {} items", n);
504 let deleted_files = deleted_files2.clone();
505 async move {
506 deleted_files.fetch_add(n, Ordering::Relaxed);
507 }
508 },
509 )
510 .await
511 .unwrap();
512
513 // Assert number of objects listed and deleted
514 assert_eq!(listed_files.load(Ordering::Relaxed), N_FILES);
515 assert_eq!(deleted_files.load(Ordering::Relaxed), N_FILES);
516
517 // Assert that number of files is 0
518 let count = algo
519 .list_prefix("test-bucket".into(), Some(dir))
520 .flatten()
521 .try_fold(0usize, |acc, _| ok(acc + 1))
522 .await
523 .unwrap();
524
525 assert_eq!(count, 0);
526 }
527}