use crate::error::{S3ExtError, S3ExtResult};
use futures::{
ready,
stream::Stream,
task::{Context, Poll},
FutureExt,
};
use rusoto_core::{RusotoError, RusotoResult};
use rusoto_s3::{
GetObjectError, GetObjectOutput, GetObjectRequest, ListObjectsV2Error, ListObjectsV2Output,
ListObjectsV2Request, Object, S3Client, S3,
};
use std::{future::Future, mem, pin::Pin, vec::IntoIter};
#[derive(Clone)]
pub struct ObjectIter {
client: S3Client,
request: ListObjectsV2Request,
objects: IntoIter<Object>,
exhausted: bool,
}
impl ObjectIter {
fn new(
client: &S3Client,
bucket: impl Into<String>,
prefix: Option<impl Into<String>>,
) -> Self {
let request = ListObjectsV2Request {
bucket: bucket.into(),
max_keys: Some(1000),
prefix: prefix.map(|s| s.into()),
..Default::default()
};
ObjectIter {
client: client.clone(),
request,
objects: Vec::new().into_iter(),
exhausted: false,
}
}
async fn next_objects(&mut self) -> RusotoResult<(), ListObjectsV2Error> {
let resp = self.client.list_objects_v2(self.request.clone()).await?;
self.update_objects(resp);
Ok(())
}
fn update_objects(&mut self, resp: ListObjectsV2Output) {
self.objects = resp.contents.unwrap_or_default().into_iter();
match resp.next_continuation_token {
next @ Some(_) => self.request.continuation_token = next,
None => self.exhausted = true,
};
}
async fn last_internal(&mut self) -> RusotoResult<Option<Object>, ListObjectsV2Error> {
let mut objects = mem::replace(&mut self.objects, Vec::new().into_iter());
while !self.exhausted {
self.next_objects().await?;
if self.objects.len() > 0 {
objects = mem::replace(&mut self.objects, Vec::new().into_iter());
}
}
Ok(objects.last())
}
pub async fn next_object(&mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
if let object @ Some(_) = self.objects.next() {
Ok(object)
} else if self.exhausted {
Ok(None)
} else {
self.next_objects().await?;
Ok(self.objects.next())
}
}
pub async fn count(mut self) -> Result<usize, RusotoError<ListObjectsV2Error>> {
let mut count = self.objects.len();
while !self.exhausted {
self.next_objects().await?;
count += self.objects.len();
}
Ok(count)
}
pub async fn last(mut self) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
self.last_internal().await
}
pub async fn nth(
&mut self,
mut n: usize,
) -> Result<Option<Object>, RusotoError<ListObjectsV2Error>> {
while self.objects.len() <= n && !self.exhausted {
n -= self.objects.len();
self.next_objects().await?;
}
Ok(self.objects.nth(n))
}
}
type ObjResult = RusotoResult<ListObjectsV2Output, ListObjectsV2Error>;
type NextObjFuture = Pin<Box<dyn Future<Output = ObjResult> + Send>>;
pub struct ObjectStream {
iter: ObjectIter,
fut: Option<NextObjFuture>,
}
impl ObjectStream {
pub(crate) fn new(
client: &S3Client,
bucket: impl Into<String>,
prefix: Option<impl Into<String>>,
) -> Self {
Self {
iter: ObjectIter::new(client, bucket, prefix),
fut: None,
}
}
pub fn get_iter(&self) -> &ObjectIter {
&self.iter
}
pub fn into_iter(self) -> ObjectIter {
self.iter
}
async fn get_objects(
client: S3Client,
request: ListObjectsV2Request,
) -> RusotoResult<ListObjectsV2Output, ListObjectsV2Error> {
client.list_objects_v2(request).await
}
}
impl Stream for ObjectStream {
type Item = RusotoResult<Object, ListObjectsV2Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.as_mut().fut.is_none() {
if let Some(object) = self.as_mut().iter.objects.next() {
return Poll::Ready(Some(Ok(object)));
} else if self.as_mut().iter.exhausted {
return Poll::Ready(None);
}
let client = self.as_mut().iter.client.clone();
let request = self.as_mut().iter.request.clone();
self.as_mut()
.fut
.replace(Box::pin(Self::get_objects(client, request)));
}
let result = ready!(self.as_mut().fut.as_mut().unwrap().poll_unpin(cx));
self.as_mut().fut.take();
match result {
Ok(resp) => self.as_mut().iter.update_objects(resp),
Err(e) => return Poll::Ready(Some(Err(e))),
}
self.as_mut()
.iter
.objects
.next()
.map_or(Poll::Ready(None), |object| Poll::Ready(Some(Ok(object))))
}
}
#[derive(Clone)]
pub struct GetObjectIter {
inner: ObjectIter,
bucket: String,
}
impl GetObjectIter {
fn new(
client: &S3Client,
bucket: impl Into<String>,
prefix: Option<impl Into<String>>,
) -> Self {
let bucket = bucket.into();
GetObjectIter {
inner: ObjectIter::new(client, &bucket, prefix),
bucket,
}
}
async fn retrieve(
&mut self,
object: Option<Object>,
) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
match object {
Some(object) => {
let key = object
.key
.ok_or(S3ExtError::Other("response is missing key"))?;
let request = GetObjectRequest {
bucket: self.bucket.clone(),
key,
..Default::default()
};
match self.inner.client.get_object(request.clone()).await {
Ok(o) => {
let key = request.key;
Ok(Some((key, o)))
}
Err(e) => Err(e.into()),
}
}
None => Ok(None),
}
}
pub async fn retrieve_next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
let next = self.inner.next_object().await?;
self.retrieve(next).await
}
#[inline]
pub async fn next(&mut self) -> S3ExtResult<Option<(String, GetObjectOutput)>> {
let next = self.inner.next_object().await?;
self.retrieve(next).await
}
#[inline]
pub async fn count(self) -> Result<usize, S3ExtError> {
self.inner.count().await.map_err(|e| e.into())
}
#[inline]
pub async fn last(mut self) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
let last = self.inner.last_internal().await?;
self.retrieve(last).await
}
#[inline]
pub async fn nth(&mut self, n: usize) -> Result<Option<(String, GetObjectOutput)>, S3ExtError> {
let nth = self.inner.nth(n).await?;
self.retrieve(nth).await
}
}
type GetObjResult = RusotoResult<GetObjectOutput, GetObjectError>;
type NextGetObjFuture = Pin<Box<dyn Future<Output = GetObjResult> + Send>>;
pub struct GetObjectStream {
iter: GetObjectIter,
next: Option<Object>,
key: Option<String>,
fut0: Option<NextObjFuture>,
fut1: Option<NextGetObjFuture>,
}
impl GetObjectStream {
pub(crate) fn new(
client: &S3Client,
bucket: impl Into<String>,
prefix: Option<impl Into<String>>,
) -> Self {
Self {
iter: GetObjectIter::new(client, bucket, prefix),
next: None,
key: None,
fut0: None,
fut1: None,
}
}
pub fn get_iter(&self) -> &GetObjectIter {
&self.iter
}
pub fn into_iter(self) -> GetObjectIter {
self.iter
}
pub fn get_inner(&self) -> &ObjectIter {
&self.iter.inner
}
pub fn into_inner(self) -> ObjectIter {
self.iter.inner
}
async fn get_object(
client: S3Client,
request: GetObjectRequest,
) -> RusotoResult<GetObjectOutput, GetObjectError> {
client.get_object(request).await
}
}
impl Stream for GetObjectStream {
type Item = S3ExtResult<(String, GetObjectOutput)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.as_mut().fut0.is_none() && self.as_mut().fut1.is_none() {
if let Some(object) = self.as_mut().iter.inner.objects.next() {
self.as_mut().next.replace(object);
} else if self.as_mut().iter.inner.exhausted {
return Poll::Ready(None);
} else {
let client = self.as_mut().iter.inner.client.clone();
let request = self.as_mut().iter.inner.request.clone();
self.as_mut()
.fut0
.replace(Box::pin(ObjectStream::get_objects(client, request)));
}
}
assert!(!(self.as_mut().fut0.is_some() && self.as_mut().fut1.is_some()));
if self.as_mut().fut0.is_some() {
let result = ready!(self.as_mut().fut0.as_mut().unwrap().poll_unpin(cx));
self.as_mut().fut0.take();
match result {
Ok(resp) => self.as_mut().iter.inner.update_objects(resp),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
}
match self.as_mut().iter.inner.objects.next() {
Some(next) => {
self.as_mut().next.replace(next);
}
None => return Poll::Ready(None),
}
}
if let Some(next) = self.as_mut().next.take() {
let key = if let Some(key) = next.key {
key
} else {
return Poll::Ready(Some(Err(S3ExtError::Other("response is missing key"))));
};
self.as_mut().key.replace(key.clone());
let client = self.as_mut().iter.inner.client.clone();
let request = GetObjectRequest {
bucket: self.as_mut().iter.bucket.clone(),
key,
..Default::default()
};
self.as_mut()
.fut1
.replace(Box::pin(Self::get_object(client, request)));
}
assert!(self.as_mut().fut0.is_none());
if self.as_mut().fut1.is_some() {
let result = ready!(self.as_mut().fut1.as_mut().unwrap().poll_unpin(cx));
self.as_mut().fut1.take();
match result {
Ok(obj) => Poll::Ready(Some(Ok((self.as_mut().key.take().unwrap(), obj)))),
Err(e) => Poll::Ready(Some(Err(e.into()))),
}
} else {
panic!("We shouldn't ever get here...");
}
}
}