s3_algo/
list_actions.rs

1use super::*;
2use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;
3use aws_sdk_s3::primitives::ByteStream;
4use aws_sdk_s3::types::{Delete, Object, ObjectIdentifier};
5use aws_smithy_types_convert::stream::PaginationStreamExt;
6use futures::future::ok;
7use futures::stream::Stream;
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io;
12
13/// A stream that can list objects, and (using member functions) delete or copy listed files.
14pub struct ListObjects<S> {
15    s3: Client,
16    config: Config,
17    bucket: String,
18    /// Common prefix (as requested) of the listed objects. Empty string if all objects were
19    /// requestd.
20    prefix: String,
21    stream: S,
22}
23impl<S> ListObjects<S>
24where
25    S: Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send + 'static,
26{
27    pub fn boxed(
28        self,
29    ) -> ListObjects<Pin<Box<dyn Stream<Item = Result<ListObjectsV2Output, Error>> + Send>>> {
30        ListObjects {
31            s3: self.s3,
32            config: self.config,
33            bucket: self.bucket,
34            stream: self.stream.boxed(),
35            prefix: self.prefix,
36        }
37    }
38
39    /// Calls an async closure on all the individual objects of the list operation
40    pub async fn process<P, F>(self, f: P) -> Result<(), Error>
41    where
42        P: Fn(Object) -> F + Clone,
43        F: Future<Output = ()>,
44    {
45        let ListObjects {
46            stream, prefix: _, ..
47        } = self;
48        stream
49            .try_filter_map(|response| ok(response.contents))
50            .map_ok(|x| stream::iter(x).map(Ok))
51            .try_flatten()
52            .try_for_each_concurrent(None, move |object| {
53                let f = f.clone();
54                async move {
55                    f(object).await;
56                    Ok(())
57                }
58            })
59            .await
60    }
61    /// Download all listed objects - returns a stream of the contents.
62    /// Used as a basis for other `download_all_*` functions.
63    pub fn download_all_stream(
64        self,
65    ) -> impl Stream<Item = Result<(String, ByteStream, Option<i64>), Error>> {
66        let ListObjects {
67            s3,
68            config: _,
69            bucket,
70            stream,
71            prefix: _,
72        } = self;
73        stream
74            .try_filter_map(|response| ok(response.contents))
75            .map_ok(|x| stream::iter(x).map(Ok))
76            .try_flatten()
77            .map(|result| {
78                result.and_then(|obj| {
79                    let Object { key, size, .. } = obj;
80                    if let Some(key) = key {
81                        Ok((key, size))
82                    } else {
83                        Err(Error::MissingKeyOrSize)
84                    }
85                })
86            })
87            .and_then(move |(key, _)| {
88                let (s3, bucket) = (s3.clone(), bucket.clone());
89
90                async move {
91                    let output = s3
92                        .get_object()
93                        .bucket(bucket.clone())
94                        .key(key.clone())
95                        .send()
96                        .await
97                        .context(err::GetObject {
98                            key: key.clone(),
99                            bucket,
100                        })?;
101                    Ok((key, output.body, output.content_length))
102                }
103            })
104    }
105
106    pub fn download_all_to_vec(self) -> impl Stream<Item = Result<(String, Vec<u8>), Error>> {
107        self.download_all_stream()
108            .and_then(|(key, body, _)| async move {
109                let mut contents = vec![];
110                io::copy(&mut body.into_async_read(), &mut contents)
111                    .await
112                    .context(err::TokioIo)?;
113                Ok((key, contents))
114            })
115    }
116
117    /*
118    /// Download all listed objects to file system.
119    /// UNIMPLEMENTED.
120    pub fn download_all(self) -> impl Future<Output = Result<(), Error>> {
121        // TODO use download_all_stream
122        ok(unimplemented!())
123    }
124    */
125
126    /// Delete all listed objects.
127    ///
128    /// With the two arguments, you can implement a detailed real-time progress report of both how
129    /// many files have been listed, and how many files have been deleted.
130    ///
131    /// `list_progress`: Closure that is given number of files listed as argument. Is called
132    /// several times, one for each batch of files listed.
133    /// `delete_progress`: Closure that is given RequestReport of a delete request. The `size`
134    /// field refers to the number of fields deleted.
135    ///
136    pub fn delete_all<P1, P2, F1, F2>(
137        self,
138        list_progress: P1,
139        delete_progress: P2,
140    ) -> impl Future<Output = Result<(), Error>>
141    where
142        P1: Fn(usize) -> F1 + Clone + Send + Sync + 'static,
143        P2: Fn(RequestReport) -> F2 + Clone + Send + Sync + 'static,
144        F1: Future<Output = ()> + Send + 'static,
145        F2: Future<Output = ()> + Send + 'static,
146    {
147        // For each ListObjectsV2Output, send a request to delete all the listed objects
148        let ListObjects {
149            s3,
150            config,
151            bucket,
152            stream,
153            prefix: _,
154        } = self;
155        let timeout = Arc::new(Mutex::new(TimeoutState::new(
156            config.algorithm.clone(),
157            config.delete_requests.clone(),
158        )));
159        let n_retries = config.algorithm.n_retries;
160        stream.try_for_each_concurrent(None, move |object| {
161            let (s3, bucket, timeout, delete_progress2, list_progress2) = (
162                s3.clone(),
163                bucket.clone(),
164                timeout.clone(),
165                delete_progress.clone(),
166                list_progress.clone(),
167            );
168            let objects = object
169                .contents
170                .unwrap_or_default() // unwrap or empty Vec
171                .iter()
172                .filter_map(|obj| {
173                    obj.key.as_ref().map(|key| {
174                        ObjectIdentifier::builder()
175                            .set_key(Some(key.clone()))
176                            .set_version_id(None)
177                            .build()
178                            .unwrap() // unwrap: shouldn't fail building as the key comes directly
179                                      // from S3
180                    })
181                })
182                .collect::<Vec<_>>();
183            let n_objects = objects.len();
184
185            async move {
186                list_progress2(n_objects).await;
187                let (report, _) = s3_request(
188                    move || {
189                        let (s3, bucket, objects) = (s3.clone(), bucket.clone(), objects.clone());
190                        async move {
191                            let (s3, bucket, objects) =
192                                (s3.clone(), bucket.clone(), objects.clone());
193                            Ok((
194                                async move {
195                                    s3.delete_objects()
196                                        .set_bucket(Some(bucket))
197                                        .set_delete(Some(
198                                            Delete::builder()
199                                                .set_objects(Some(objects))
200                                                .build()
201                                                .unwrap(), // unwrap: shouldn't fail building
202                                                           // because all the input comes directly from S3
203                                        ))
204                                        .send()
205                                        .await
206                                        .map_err(|e| e.into())
207                                },
208                                n_objects,
209                            ))
210                        }
211                    },
212                    |_, size| size,
213                    n_retries,
214                    timeout.clone(),
215                )
216                .await?;
217                timeout.lock().await.update(&report);
218                delete_progress2(report).await;
219                Ok(())
220            }
221        })
222    }
223
224    /// Flatten into a stream of Objects.
225    pub fn flatten(self) -> impl Stream<Item = Result<Object, Error>> {
226        self.stream
227            .try_filter_map(|response| ok(response.contents))
228            .map_ok(|x| stream::iter(x).map(Ok))
229            .try_flatten()
230    }
231
232    /*
233    /// This function exists to provide a stream to copy all objects, for both `copy_all` and
234    /// `move_all`. The `String` that is the stream's `Item` is the _source key_. An `Ok` value
235    /// thus signals (relevant when used in `move_all`) that a certain key is ready for deletion.
236    fn copy_all_stream<F, R>(
237        self,
238        dest_bucket: Option<String>,
239        mapping: F,
240        default_request: R,
241    ) -> impl Stream<Item = Result<String, Error>>
242    where
243        F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
244        R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
245    {
246        let ListObjects {
247            s3,
248            config,
249            bucket,
250            stream,
251            prefix: _,
252        } = self;
253        let timeout = Arc::new(Mutex::new(TimeoutState::new(
254            config.algorithm.clone(),
255            config.put_requests.clone(),
256        )));
257        let n_retries = config.algorithm.n_retries;
258        let dest_bucket = dest_bucket.unwrap_or_else(|| bucket.clone());
259        stream
260            .try_filter_map(|response| ok(response.1.contents))
261            .map_ok(|x| stream::iter(x).map(Ok))
262            .try_flatten()
263            .try_filter_map(|obj| {
264                // Just filter out any object that does not have both of `key` and `size`
265                let Object { key, size, .. } = obj;
266                ok(key.and_then(|key| size.map(|size| (key, size))))
267            })
268            .and_then(move |(key, size)| {
269                let (s3, timeout) = (s3.clone(), timeout.clone());
270                let request = CopyObjectRequest {
271                    copy_source: format!("{}/{}", bucket, key),
272                    bucket: dest_bucket.clone(),
273                    key: mapping(&key),
274                    ..default_request()
275                };
276                // println!("COPY REQUEST\n{:#?}", request);
277                s3_request(
278                    move || {
279                        let (s3, request) = (s3.clone(), request.clone());
280                        async move {
281                            let (s3, request) = (s3.clone(), request.clone());
282                            Ok((async move{s3.copy_object(request).context(err::CopyObject).await}, size as usize))
283                        }
284                    },
285                    |_, size| size,
286                    n_retries,
287                    timeout,
288                )
289                .map_ok(|_| key)
290            })
291    }
292
293    /// Copy all listed objects, to a different S3 location as defined in `mapping` and
294    /// `dest_bucket`.
295    /// If `other_bucket` is not provided, copy to same bucket
296    pub fn copy_all<F, R>(
297        self,
298        dest_bucket: Option<String>,
299        mapping: F,
300        default_request: R,
301    ) -> impl Future<Output = Result<(), Error>>
302    where
303        F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
304        R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
305    {
306        self.copy_all_stream(dest_bucket, mapping, default_request)
307            .try_for_each(|_| async { Ok(()) })
308    }
309    // TODO: Is it possible to change copy_all so that we can move_all by just chaining copy_all
310    // and delete_all? Then copy_all would need to return a stream of old keys, but does that make
311    // sense in general?
312    // For now, this is code duplication.
313    pub fn move_all<F, R>(
314        self,
315        dest_bucket: Option<String>,
316        mapping: F,
317        default_request: R,
318    ) -> impl Future<Output = Result<(), Error>>
319    where
320        F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
321        R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
322    {
323        let src_bucket = self.bucket.clone();
324        let timeout = Arc::new(Mutex::new(TimeoutState::new(
325            self.config.algorithm.clone(),
326            self.config.delete_requests.clone(),
327        )));
328        let n_retries = self.config.algorithm.n_retries;
329        let s3 = self.s3.clone();
330        self.copy_all_stream(dest_bucket, mapping, default_request)
331            .and_then(move |src_key| {
332                let delete_request = DeleteObjectRequest {
333                    bucket: src_bucket.clone(),
334                    key: src_key,
335                    ..Default::default()
336                };
337                let (s3, timeout) = (s3.clone(), timeout.clone());
338                s3_request(
339                    move || {
340                        let (s3, delete_request) = (s3.clone(), delete_request.clone());
341                        async move {
342                            let (s3, delete_request) = (s3.clone(), delete_request.clone());
343                            Ok((
344                                async move {
345                                    s3.delete_object(delete_request)
346                                        .context(err::DeleteObject)
347                                        .await
348                                },
349                                1,
350                            ))
351                        }
352                    },
353                    |_, _| 1,
354                    n_retries,
355                    timeout,
356                )
357                .map_ok(drop)
358                .boxed()
359            })
360            .try_for_each(|_| async { Ok(()) })
361            .boxed()
362    }
363    /// Move all listed objects by substituting their common prefix with `new_prefix`.
364    pub fn move_to_prefix<R>(
365        self,
366        dest_bucket: Option<String>,
367        new_prefix: String,
368        default_request: R,
369    ) -> impl Future<Output = Result<(), Error>>
370    where
371        R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
372    {
373        let old_prefix = self.prefix.clone();
374        let substitute_prefix =
375            move |source: &str| format!("{}{}", new_prefix, source.trim_start_matches(&old_prefix));
376        self.move_all(dest_bucket, substitute_prefix, default_request)
377            .boxed()
378    }
379    */
380}
381
382impl<S> Stream for ListObjects<S>
383where
384    S: Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send + Unpin,
385{
386    type Item = Result<ListObjectsV2Output, Error>;
387    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
388        Pin::new(&mut self.stream).poll_next(cx)
389    }
390}
391
392impl S3Algo {
393    /// List objects of a bucket.
394    pub fn list_prefix(
395        &self,
396        bucket: String,
397        prefix: Option<String>,
398    ) -> ListObjects<impl Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send> {
399        // TODO: Reintroduce retry and timeout
400
401        let stream = self
402            .s3
403            .list_objects_v2()
404            .bucket(bucket.clone())
405            .set_prefix(prefix)
406            .into_paginator()
407            .send();
408        let stream = PaginationStreamExt::into_stream_03x(stream)
409            // Turn into a stream of Objects
410            .map_err(|source| Error::ListObjectsV2 { source });
411
412        ListObjects {
413            s3: self.s3.clone(),
414            config: self.config.clone(),
415            stream,
416            bucket,
417            prefix: String::new(),
418        }
419    }
420}
421
422#[cfg(test)]
423mod test {
424    use super::*;
425    use crate::test::rand_string;
426    use std::sync::atomic::{AtomicUsize, Ordering};
427    #[tokio::test]
428    async fn test_s3_delete_files_progress() {
429        // Minio does paging at 10'000 fles, so we need more than that.
430        // It means this test will take a minutes or two.
431        let algo = S3Algo::new(testing_sdk_client().await);
432        let dir = rand_string(14);
433        let dir2 = dir.clone();
434        const N_FILES: usize = 11_000;
435        let files = (0..N_FILES).map(move |i| ObjectSource::Data {
436            data: vec![1, 2, 3],
437            key: format!("{}/{}.file", dir2, i),
438        });
439        algo.upload_files(
440            "test-bucket".into(),
441            files,
442            |result| async move {
443                if result.seq % 100 == 0 {
444                    println!("{} files uploaded", result.seq);
445                }
446            },
447            |client| client.put_object(),
448        )
449        .await
450        .unwrap();
451
452        let listed_files = Arc::new(AtomicUsize::new(0));
453        let deleted_files = Arc::new(AtomicUsize::new(0));
454        let listed_files2 = listed_files.clone();
455        let deleted_files2 = deleted_files.clone();
456
457        // Do one listing only to check the exact file names
458        let present = Arc::new(Mutex::new(std::collections::HashSet::new()));
459        algo.list_prefix("test-bucket".into(), Some(dir.clone()))
460            .process(|object| async {
461                let name = object.key.unwrap_or_else(|| "NONE".to_string());
462                println!("OBJ {}", name);
463                present.lock().await.insert(name);
464            })
465            .await
466            .unwrap();
467        let mut present = present.lock().await;
468
469        // All files are present
470        for i in 0..N_FILES {
471            let file_name = &format!("{}/{}.file", dir, i);
472            assert!(present.remove(file_name));
473        }
474
475        // No unexpected filesnames.
476        // Because once, it listed 11_200 files instead of 11_000
477        if !present.is_empty() {
478            println!("Left-over object names: {:?}", present);
479            panic!("Not empty ({} files)", present.len());
480        }
481
482        // Assert that number of files is N_FILES
483        let count = algo
484            .list_prefix("test-bucket".into(), Some(dir.clone()))
485            .flatten()
486            .try_fold(0usize, |acc, _| ok(acc + 1))
487            .await
488            .unwrap();
489        assert_eq!(count, N_FILES);
490
491        // Delete all
492        algo.list_prefix("test-bucket".into(), Some(dir.clone()))
493            .delete_all(
494                move |n| {
495                    println!("Listed {} items", n);
496                    let listed_files = listed_files2.clone();
497                    async move {
498                        listed_files.fetch_add(n, Ordering::Relaxed);
499                    }
500                },
501                move |del_rep| {
502                    let n = del_rep.size as usize;
503                    println!("Deleted {} items", n);
504                    let deleted_files = deleted_files2.clone();
505                    async move {
506                        deleted_files.fetch_add(n, Ordering::Relaxed);
507                    }
508                },
509            )
510            .await
511            .unwrap();
512
513        // Assert number of objects listed and deleted
514        assert_eq!(listed_files.load(Ordering::Relaxed), N_FILES);
515        assert_eq!(deleted_files.load(Ordering::Relaxed), N_FILES);
516
517        // Assert that number of files is 0
518        let count = algo
519            .list_prefix("test-bucket".into(), Some(dir))
520            .flatten()
521            .try_fold(0usize, |acc, _| ok(acc + 1))
522            .await
523            .unwrap();
524
525        assert_eq!(count, 0);
526    }
527}