s3_ext/
iter.rs

1//! Iterators over `Object`s
2//!
3//! # Example
4//!
5//! ```
6//! use futures::stream::{Stream, StreamExt, TryStreamExt};
7//! use futures::future::try_join_all;
8//! use std::future::Future;
9//! use rand::RngCore;
10//! use rusoto_core::Region;
11//! use rusoto_s3::{CreateBucketRequest, PutObjectRequest, S3, S3Client, DeleteObjectRequest, DeleteBucketRequest};
12//! use s3_ext::S3Ext;
13//! use std::env;
14//! use tokio::io::AsyncReadExt;
15//!
16//! use s3_ext::error::S3ExtError;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), S3ExtError> {
20//!     let bucket = format!("s3-ext-iter-module-example-{}", rand::thread_rng().next_u64());
21//!
22//!     // setup client
23//!     let client = if let Ok(endpoint) = env::var("S3_ENDPOINT") {
24//!         let access_key = "ANTN35UAENTS5UIAEATD".to_string();
25//!         let secret_key = "TtnuieannGt2rGuie2t8Tt7urarg5nauedRndrur".to_string();
26//!         let region = Region::Custom {
27//!             name: "eu-west-1".to_string(),
28//!             endpoint,
29//!         };
30//!         s3_ext::new_s3client_with_credentials(region, access_key, secret_key)?
31//!     } else {
32//!         S3Client::new(Region::UsEast1)
33//!     };
34//!
35//!     // create bucket
36//!
37//!     client
38//!         .create_bucket(CreateBucketRequest {
39//!             bucket: bucket.clone(),
40//!             ..Default::default()
41//!         })
42//!         .await?;
43//!
44//!     // create test objects
45//!     let mut keys = Vec::new();
46//!     for obj in (0..5).map(|n| format!("object_{:02}", n)) {
47//!         client
48//!             .put_object(PutObjectRequest {
49//!                 bucket: bucket.clone(),
50//!                 key: obj.to_string(),
51//!                 body: Some(obj.as_bytes().to_vec().into()),
52//!                 ..Default::default()
53//!             })
54//!             .await?;
55//!         keys.push(obj);
56//!     }
57//!
58//!     // iterate over objects objects (sorted alphabetically)
59//!
60//!    let objects: Vec<_> = client
61//!         .stream_objects(&bucket)
62//!         .map(|res| res.map(|obj| obj.key))
63//!         .try_collect()
64//!         .await?;
65//!    let objects: Vec<_> = objects.into_iter().filter_map(|x| x).collect();
66//!
67//!     assert_eq!(
68//!         objects.as_slice(),
69//!         &[
70//!             "object_00",
71//!             "object_01",
72//!             "object_02",
73//!             "object_03",
74//!             "object_04",
75//!         ]
76//!     );
77//!
78//!     // iterate object and fetch content on the fly (sorted alphabetically)
79//!     let results: Result<Vec<_>, _> = client
80//!         .stream_get_objects(&bucket)
81//!         .map(|res| res.map(|(key, obj)| (key, obj.body)))
82//!         .try_collect()
83//!         .await;
84//!
85//!     let futures: Vec<_> = results?
86//!         .into_iter()
87//!         .map(|(key, body)| async move {
88//!             let mut buf = Vec::new();
89//!             if let Some(body) = body {
90//!                 match body.into_async_read().read_to_end(&mut buf).await {
91//!                     Ok(_) => Ok(Some((key, buf))),
92//!                     Err(e) => Err(e),
93//!                 }
94//!             } else {
95//!                 Ok(None)
96//!             }
97//!         })
98//!         .collect();
99//!     let results: Result<Vec<_>, _> = try_join_all(futures).await;
100//!     let objects: Vec<_> = results?.into_iter().filter_map(|x| x).collect();
101//!     for key in keys {
102//!     client.delete_object(DeleteObjectRequest {
103//!         bucket: bucket.clone(),
104//!         key: key.to_string(),
105//!         ..Default::default()
106//!     }).await.unwrap();
107//!     }
108//!     client.delete_bucket(DeleteBucketRequest {
109//!         bucket: bucket.into(),
110//!         ..Default::default()
111//!     }).await.unwrap();
112//!
113//!     for (i, (key, body)) in objects.iter().enumerate() {
114//!         let expected = format!("object_{:02}", i);
115//!         assert_eq!(key, &expected);
116//!         assert_eq!(body.as_slice(), expected.as_bytes());
117//!     }
118//!     Ok(())
119//! }
120//! ```
121
122use crate::error::{S3ExtError, S3ExtResult};
123use futures::{
124    ready,
125    stream::Stream,
126    task::{Context, Poll},
127    FutureExt,
128};
129use rusoto_core::{RusotoError, RusotoResult};
130use rusoto_s3::{
131    GetObjectError, GetObjectOutput, GetObjectRequest, ListObjectsV2Error, ListObjectsV2Output,
132    ListObjectsV2Request, Object, S3Client, S3,
133};
134use std::{future::Future, mem, pin::Pin, vec::IntoIter};
135
136/// Iterator-like objects, forms the basis of `ObjectStream`
137#[derive(Clone)]
138pub struct ObjectIter {
139    client: S3Client,
140    request: ListObjectsV2Request,
141    objects: IntoIter<Object>,
142    exhausted: bool,
143}
144
145impl ObjectIter {
146    fn new(
147        client: &S3Client,
148        bucket: impl Into<String>,
149        prefix: Option<impl Into<String>>,
150    ) -> Self {
151        let request = ListObjectsV2Request {
152            bucket: bucket.into(),
153            max_keys: Some(1000),
154            prefix: prefix.map(|s| s.into()),
155            ..Default::default()
156        };
157
158        ObjectIter {
159            client: client.clone(),
160            request,
161            objects: Vec::new().into_iter(),
162            exhausted: false,
163        }
164    }
165
166    async fn next_objects(&mut self) -> RusotoResult<(), ListObjectsV2Error> {
167        let resp = self.client.list_objects_v2(self.request.clone()).await?;
168        self.update_objects(resp);
169        Ok(())
170    }
171
172    fn update_objects(&mut self, resp: ListObjectsV2Output) {
173        self.objects = resp.contents.unwrap_or_default().into_iter();
174        match resp.next_continuation_token {
175            next @ Some(_) => self.request.continuation_token = next,
176            None => self.exhausted = true,
177        };
178    }
179
180    async fn last_internal(&mut self) -> RusotoResult<Option<Object>, ListObjectsV2Error> {
181        let mut objects = mem::replace(&mut self.objects, Vec::new().into_iter());
182        while !self.exhausted {
183            self.next_objects().await?;
184            if self.objects.len() > 0 {
185                objects = mem::replace(&mut self.objects, Vec::new().into_iter());
186            }
187        }
188        Ok(objects.last())
189    }
190
191    /// Get the next object (or None if there are no more objects), may return
192    /// an error when fetching objects.
193    pub async fn next_object(&mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
194        if let object @ Some(_) = self.objects.next() {
195            Ok(object)
196        } else if self.exhausted {
197            Ok(None)
198        } else {
199            self.next_objects().await?;
200            Ok(self.objects.next())
201        }
202    }
203
204    /// Consume the iterator and return the number of objects
205    pub async fn count(mut self) -> Result<usize, RusotoError<ListObjectsV2Error>> {
206        let mut count = self.objects.len();
207        while !self.exhausted {
208            self.next_objects().await?;
209            count += self.objects.len();
210        }
211        Ok(count)
212    }
213
214    /// Consume the iterator and return the last object
215    pub async fn last(mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
216        self.last_internal().await
217    }
218
219    /// Consume the iterator and return the nth object
220    pub async fn nth(
221        &mut self,
222        mut n: usize,
223    ) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
224        while self.objects.len() <= n && !self.exhausted {
225            n -= self.objects.len();
226            self.next_objects().await?;
227        }
228        Ok(self.objects.nth(n))
229    }
230}
231
232type ObjResult = RusotoResult<ListObjectsV2Output, ListObjectsV2Error>;
233type NextObjFuture = Pin<Box<dyn Future<Output = ObjResult> + Send>>;
234
235/// Stream over objects
236pub struct ObjectStream {
237    iter: ObjectIter,
238    fut: Option<NextObjFuture>,
239}
240
241impl ObjectStream {
242    pub(crate) fn new(
243        client: &S3Client,
244        bucket: impl Into<String>,
245        prefix: Option<impl Into<String>>,
246    ) -> Self {
247        Self {
248            iter: ObjectIter::new(client, bucket, prefix),
249            fut: None,
250        }
251    }
252
253    /// Return a reference to `ObjectIter`
254    pub fn get_iter(&self) -> &ObjectIter {
255        &self.iter
256    }
257
258    /// Consume the string and return the `ObjectIter`
259    pub fn into_iter(self) -> ObjectIter {
260        self.iter
261    }
262
263    async fn get_objects(
264        client: S3Client,
265        request: ListObjectsV2Request,
266    ) -> RusotoResult<ListObjectsV2Output, ListObjectsV2Error> {
267        client.list_objects_v2(request).await
268    }
269}
270
271// This is kind of ugly but seems to work as intended, I hope that one day this
272// can be done more simply...
273impl Stream for ObjectStream {
274    type Item = RusotoResult<Object, ListObjectsV2Error>;
275    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
276        if self.as_mut().fut.is_none() {
277            if let Some(object) = self.as_mut().iter.objects.next() {
278                return Poll::Ready(Some(Ok(object)));
279            } else if self.as_mut().iter.exhausted {
280                return Poll::Ready(None);
281            }
282            let client = self.as_mut().iter.client.clone();
283            let request = self.as_mut().iter.request.clone();
284            self.as_mut()
285                .fut
286                .replace(Box::pin(Self::get_objects(client, request)));
287        }
288
289        let result = ready!(self.as_mut().fut.as_mut().unwrap().poll_unpin(cx));
290        self.as_mut().fut.take();
291
292        match result {
293            Ok(resp) => self.as_mut().iter.update_objects(resp),
294            Err(e) => return Poll::Ready(Some(Err(e))),
295        }
296        self.as_mut()
297            .iter
298            .objects
299            .next()
300            .map_or(Poll::Ready(None), |object| Poll::Ready(Some(Ok(object))))
301    }
302}
303
304/// Iterator-like object retrieving all objects or objects with a given prefix
305///
306/// The iterator yields tuples of `(key, object)`.
307#[derive(Clone)]
308pub struct GetObjectIter {
309    inner: ObjectIter,
310    bucket: String,
311}
312
313impl GetObjectIter {
314    fn new(
315        client: &S3Client,
316        bucket: impl Into<String>,
317        prefix: Option<impl Into<String>>,
318    ) -> Self {
319        let bucket = bucket.into();
320        GetObjectIter {
321            inner: ObjectIter::new(client, &bucket, prefix),
322            bucket,
323        }
324    }
325
326    async fn retrieve(
327        &mut self,
328        object: Option<Object>,
329    ) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
330        match object {
331            Some(object) => {
332                let key = object
333                    .key
334                    .ok_or(S3ExtError::Other("response is missing key"))?;
335                let request = GetObjectRequest {
336                    bucket: self.bucket.clone(),
337                    key,
338                    ..Default::default()
339                };
340                match self.inner.client.get_object(request.clone()).await {
341                    Ok(o) => {
342                        let key = request.key;
343                        Ok(Some((key, o)))
344                    }
345                    Err(e) => Err(e.into()),
346                }
347            }
348            None => Ok(None),
349        }
350    }
351
352    /// Retrieve the next object
353    pub async fn retrieve_next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
354        let next = self.inner.next_object().await?;
355        self.retrieve(next).await
356    }
357
358    #[inline]
359    pub async fn next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
360        let next = self.inner.next_object().await?;
361        self.retrieve(next).await
362    }
363
364    #[inline]
365    /// Consume the iterator and return the number of elements
366    pub async fn count(self) -> Result<usize, S3ExtError> {
367        self.inner.count().await.map_err(|e| e.into())
368    }
369
370    #[inline]
371    /// Consume the iterator and retreive the last element
372    pub async fn last(mut self) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
373        let last = self.inner.last_internal().await?;
374        self.retrieve(last).await
375    }
376
377    #[inline]
378    /// Consume the iterator and return the nth element
379    pub async fn nth(&mut self, n: usize) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
380        let nth = self.inner.nth(n).await?;
381        self.retrieve(nth).await
382    }
383}
384
385type GetObjResult = RusotoResult<GetObjectOutput, GetObjectError>;
386type NextGetObjFuture = Pin<Box<dyn Future<Output = GetObjResult> + Send>>;
387
388/// Stream which retrieves objects
389pub struct GetObjectStream {
390    iter: GetObjectIter,
391    next: Option<Object>,
392    key: Option<String>,
393    fut0: Option<NextObjFuture>,
394    fut1: Option<NextGetObjFuture>,
395}
396
397impl GetObjectStream {
398    pub(crate) fn new(
399        client: &S3Client,
400        bucket: impl Into<String>,
401        prefix: Option<impl Into<String>>,
402    ) -> Self {
403        Self {
404            iter: GetObjectIter::new(client, bucket, prefix),
405            next: None,
406            key: None,
407            fut0: None,
408            fut1: None,
409        }
410    }
411
412    /// Return a reference to our `GetObjectIter` object
413    pub fn get_iter(&self) -> &GetObjectIter {
414        &self.iter
415    }
416
417    /// Return our `GetObjectIter` object
418    pub fn into_iter(self) -> GetObjectIter {
419        self.iter
420    }
421
422    /// Return a reference to our `ObjectIter` object
423    pub fn get_inner(&self) -> &ObjectIter {
424        &self.iter.inner
425    }
426
427    /// Return our `ObjectIter` object
428    pub fn into_inner(self) -> ObjectIter {
429        self.iter.inner
430    }
431
432    async fn get_object(
433        client: S3Client,
434        request: GetObjectRequest,
435    ) -> RusotoResult<GetObjectOutput, GetObjectError> {
436        client.get_object(request).await
437    }
438}
439
440impl Stream for GetObjectStream {
441    type Item = S3ExtResult<(String, GetObjectOutput)>;
442    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
443        if self.as_mut().fut0.is_none() && self.as_mut().fut1.is_none() {
444            if let Some(object) = self.as_mut().iter.inner.objects.next() {
445                self.as_mut().next.replace(object);
446            } else if self.as_mut().iter.inner.exhausted {
447                return Poll::Ready(None);
448            } else {
449                let client = self.as_mut().iter.inner.client.clone();
450                let request = self.as_mut().iter.inner.request.clone();
451                self.as_mut()
452                    .fut0
453                    .replace(Box::pin(ObjectStream::get_objects(client, request)));
454            }
455        }
456
457        assert!(!(self.as_mut().fut0.is_some() && self.as_mut().fut1.is_some()));
458
459        if self.as_mut().fut0.is_some() {
460            let result = ready!(self.as_mut().fut0.as_mut().unwrap().poll_unpin(cx));
461            self.as_mut().fut0.take();
462
463            match result {
464                Ok(resp) => self.as_mut().iter.inner.update_objects(resp),
465                Err(e) => return Poll::Ready(Some(Err(e.into()))),
466            }
467            match self.as_mut().iter.inner.objects.next() {
468                Some(next) => {
469                    self.as_mut().next.replace(next);
470                }
471                None => return Poll::Ready(None),
472            }
473        }
474
475        if let Some(next) = self.as_mut().next.take() {
476            let key = if let Some(key) = next.key {
477                key
478            } else {
479                return Poll::Ready(Some(Err(S3ExtError::Other("response is missing key"))));
480            };
481            self.as_mut().key.replace(key.clone());
482            let client = self.as_mut().iter.inner.client.clone();
483            let request = GetObjectRequest {
484                bucket: self.as_mut().iter.bucket.clone(),
485                key,
486                ..Default::default()
487            };
488            self.as_mut()
489                .fut1
490                .replace(Box::pin(Self::get_object(client, request)));
491        }
492
493        assert!(self.as_mut().fut0.is_none());
494
495        if self.as_mut().fut1.is_some() {
496            let result = ready!(self.as_mut().fut1.as_mut().unwrap().poll_unpin(cx));
497            self.as_mut().fut1.take();
498            match result {
499                Ok(obj) => Poll::Ready(Some(Ok((self.as_mut().key.take().unwrap(), obj)))),
500                Err(e) => Poll::Ready(Some(Err(e.into()))),
501            }
502        } else {
503            panic!("We shouldn't ever get here...");
504        }
505    }
506}