1use crate::error::{S3ExtError, S3ExtResult};
123use futures::{
124 ready,
125 stream::Stream,
126 task::{Context, Poll},
127 FutureExt,
128};
129use rusoto_core::{RusotoError, RusotoResult};
130use rusoto_s3::{
131 GetObjectError, GetObjectOutput, GetObjectRequest, ListObjectsV2Error, ListObjectsV2Output,
132 ListObjectsV2Request, Object, S3Client, S3,
133};
134use std::{future::Future, mem, pin::Pin, vec::IntoIter};
135
136#[derive(Clone)]
138pub struct ObjectIter {
139 client: S3Client,
140 request: ListObjectsV2Request,
141 objects: IntoIter<Object>,
142 exhausted: bool,
143}
144
145impl ObjectIter {
146 fn new(
147 client: &S3Client,
148 bucket: impl Into<String>,
149 prefix: Option<impl Into<String>>,
150 ) -> Self {
151 let request = ListObjectsV2Request {
152 bucket: bucket.into(),
153 max_keys: Some(1000),
154 prefix: prefix.map(|s| s.into()),
155 ..Default::default()
156 };
157
158 ObjectIter {
159 client: client.clone(),
160 request,
161 objects: Vec::new().into_iter(),
162 exhausted: false,
163 }
164 }
165
166 async fn next_objects(&mut self) -> RusotoResult<(), ListObjectsV2Error> {
167 let resp = self.client.list_objects_v2(self.request.clone()).await?;
168 self.update_objects(resp);
169 Ok(())
170 }
171
172 fn update_objects(&mut self, resp: ListObjectsV2Output) {
173 self.objects = resp.contents.unwrap_or_default().into_iter();
174 match resp.next_continuation_token {
175 next @ Some(_) => self.request.continuation_token = next,
176 None => self.exhausted = true,
177 };
178 }
179
180 async fn last_internal(&mut self) -> RusotoResult<Option<Object>, ListObjectsV2Error> {
181 let mut objects = mem::replace(&mut self.objects, Vec::new().into_iter());
182 while !self.exhausted {
183 self.next_objects().await?;
184 if self.objects.len() > 0 {
185 objects = mem::replace(&mut self.objects, Vec::new().into_iter());
186 }
187 }
188 Ok(objects.last())
189 }
190
191 pub async fn next_object(&mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
194 if let object @ Some(_) = self.objects.next() {
195 Ok(object)
196 } else if self.exhausted {
197 Ok(None)
198 } else {
199 self.next_objects().await?;
200 Ok(self.objects.next())
201 }
202 }
203
204 pub async fn count(mut self) -> Result<usize, RusotoError<ListObjectsV2Error>> {
206 let mut count = self.objects.len();
207 while !self.exhausted {
208 self.next_objects().await?;
209 count += self.objects.len();
210 }
211 Ok(count)
212 }
213
214 pub async fn last(mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
216 self.last_internal().await
217 }
218
219 pub async fn nth(
221 &mut self,
222 mut n: usize,
223 ) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
224 while self.objects.len() <= n && !self.exhausted {
225 n -= self.objects.len();
226 self.next_objects().await?;
227 }
228 Ok(self.objects.nth(n))
229 }
230}
231
232type ObjResult = RusotoResult<ListObjectsV2Output, ListObjectsV2Error>;
233type NextObjFuture = Pin<Box<dyn Future<Output = ObjResult> + Send>>;
234
235pub struct ObjectStream {
237 iter: ObjectIter,
238 fut: Option<NextObjFuture>,
239}
240
241impl ObjectStream {
242 pub(crate) fn new(
243 client: &S3Client,
244 bucket: impl Into<String>,
245 prefix: Option<impl Into<String>>,
246 ) -> Self {
247 Self {
248 iter: ObjectIter::new(client, bucket, prefix),
249 fut: None,
250 }
251 }
252
253 pub fn get_iter(&self) -> &ObjectIter {
255 &self.iter
256 }
257
258 pub fn into_iter(self) -> ObjectIter {
260 self.iter
261 }
262
263 async fn get_objects(
264 client: S3Client,
265 request: ListObjectsV2Request,
266 ) -> RusotoResult<ListObjectsV2Output, ListObjectsV2Error> {
267 client.list_objects_v2(request).await
268 }
269}
270
271impl Stream for ObjectStream {
274 type Item = RusotoResult<Object, ListObjectsV2Error>;
275 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
276 if self.as_mut().fut.is_none() {
277 if let Some(object) = self.as_mut().iter.objects.next() {
278 return Poll::Ready(Some(Ok(object)));
279 } else if self.as_mut().iter.exhausted {
280 return Poll::Ready(None);
281 }
282 let client = self.as_mut().iter.client.clone();
283 let request = self.as_mut().iter.request.clone();
284 self.as_mut()
285 .fut
286 .replace(Box::pin(Self::get_objects(client, request)));
287 }
288
289 let result = ready!(self.as_mut().fut.as_mut().unwrap().poll_unpin(cx));
290 self.as_mut().fut.take();
291
292 match result {
293 Ok(resp) => self.as_mut().iter.update_objects(resp),
294 Err(e) => return Poll::Ready(Some(Err(e))),
295 }
296 self.as_mut()
297 .iter
298 .objects
299 .next()
300 .map_or(Poll::Ready(None), |object| Poll::Ready(Some(Ok(object))))
301 }
302}
303
304#[derive(Clone)]
308pub struct GetObjectIter {
309 inner: ObjectIter,
310 bucket: String,
311}
312
313impl GetObjectIter {
314 fn new(
315 client: &S3Client,
316 bucket: impl Into<String>,
317 prefix: Option<impl Into<String>>,
318 ) -> Self {
319 let bucket = bucket.into();
320 GetObjectIter {
321 inner: ObjectIter::new(client, &bucket, prefix),
322 bucket,
323 }
324 }
325
326 async fn retrieve(
327 &mut self,
328 object: Option<Object>,
329 ) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
330 match object {
331 Some(object) => {
332 let key = object
333 .key
334 .ok_or(S3ExtError::Other("response is missing key"))?;
335 let request = GetObjectRequest {
336 bucket: self.bucket.clone(),
337 key,
338 ..Default::default()
339 };
340 match self.inner.client.get_object(request.clone()).await {
341 Ok(o) => {
342 let key = request.key;
343 Ok(Some((key, o)))
344 }
345 Err(e) => Err(e.into()),
346 }
347 }
348 None => Ok(None),
349 }
350 }
351
352 pub async fn retrieve_next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
354 let next = self.inner.next_object().await?;
355 self.retrieve(next).await
356 }
357
358 #[inline]
359 pub async fn next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
360 let next = self.inner.next_object().await?;
361 self.retrieve(next).await
362 }
363
364 #[inline]
365 pub async fn count(self) -> Result<usize, S3ExtError> {
367 self.inner.count().await.map_err(|e| e.into())
368 }
369
370 #[inline]
371 pub async fn last(mut self) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
373 let last = self.inner.last_internal().await?;
374 self.retrieve(last).await
375 }
376
377 #[inline]
378 pub async fn nth(&mut self, n: usize) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
380 let nth = self.inner.nth(n).await?;
381 self.retrieve(nth).await
382 }
383}
384
385type GetObjResult = RusotoResult<GetObjectOutput, GetObjectError>;
386type NextGetObjFuture = Pin<Box<dyn Future<Output = GetObjResult> + Send>>;
387
388pub struct GetObjectStream {
390 iter: GetObjectIter,
391 next: Option<Object>,
392 key: Option<String>,
393 fut0: Option<NextObjFuture>,
394 fut1: Option<NextGetObjFuture>,
395}
396
397impl GetObjectStream {
398 pub(crate) fn new(
399 client: &S3Client,
400 bucket: impl Into<String>,
401 prefix: Option<impl Into<String>>,
402 ) -> Self {
403 Self {
404 iter: GetObjectIter::new(client, bucket, prefix),
405 next: None,
406 key: None,
407 fut0: None,
408 fut1: None,
409 }
410 }
411
412 pub fn get_iter(&self) -> &GetObjectIter {
414 &self.iter
415 }
416
417 pub fn into_iter(self) -> GetObjectIter {
419 self.iter
420 }
421
422 pub fn get_inner(&self) -> &ObjectIter {
424 &self.iter.inner
425 }
426
427 pub fn into_inner(self) -> ObjectIter {
429 self.iter.inner
430 }
431
432 async fn get_object(
433 client: S3Client,
434 request: GetObjectRequest,
435 ) -> RusotoResult<GetObjectOutput, GetObjectError> {
436 client.get_object(request).await
437 }
438}
439
440impl Stream for GetObjectStream {
441 type Item = S3ExtResult<(String, GetObjectOutput)>;
442 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
443 if self.as_mut().fut0.is_none() && self.as_mut().fut1.is_none() {
444 if let Some(object) = self.as_mut().iter.inner.objects.next() {
445 self.as_mut().next.replace(object);
446 } else if self.as_mut().iter.inner.exhausted {
447 return Poll::Ready(None);
448 } else {
449 let client = self.as_mut().iter.inner.client.clone();
450 let request = self.as_mut().iter.inner.request.clone();
451 self.as_mut()
452 .fut0
453 .replace(Box::pin(ObjectStream::get_objects(client, request)));
454 }
455 }
456
457 assert!(!(self.as_mut().fut0.is_some() && self.as_mut().fut1.is_some()));
458
459 if self.as_mut().fut0.is_some() {
460 let result = ready!(self.as_mut().fut0.as_mut().unwrap().poll_unpin(cx));
461 self.as_mut().fut0.take();
462
463 match result {
464 Ok(resp) => self.as_mut().iter.inner.update_objects(resp),
465 Err(e) => return Poll::Ready(Some(Err(e.into()))),
466 }
467 match self.as_mut().iter.inner.objects.next() {
468 Some(next) => {
469 self.as_mut().next.replace(next);
470 }
471 None => return Poll::Ready(None),
472 }
473 }
474
475 if let Some(next) = self.as_mut().next.take() {
476 let key = if let Some(key) = next.key {
477 key
478 } else {
479 return Poll::Ready(Some(Err(S3ExtError::Other("response is missing key"))));
480 };
481 self.as_mut().key.replace(key.clone());
482 let client = self.as_mut().iter.inner.client.clone();
483 let request = GetObjectRequest {
484 bucket: self.as_mut().iter.bucket.clone(),
485 key,
486 ..Default::default()
487 };
488 self.as_mut()
489 .fut1
490 .replace(Box::pin(Self::get_object(client, request)));
491 }
492
493 assert!(self.as_mut().fut0.is_none());
494
495 if self.as_mut().fut1.is_some() {
496 let result = ready!(self.as_mut().fut1.as_mut().unwrap().poll_unpin(cx));
497 self.as_mut().fut1.take();
498 match result {
499 Ok(obj) => Poll::Ready(Some(Ok((self.as_mut().key.take().unwrap(), obj)))),
500 Err(e) => Poll::Ready(Some(Err(e.into()))),
501 }
502 } else {
503 panic!("We shouldn't ever get here...");
504 }
505 }
506}