use crate::dynamodb::{
AttributeValue, BackupSummary, DynamoDb, ListBackupsError, ListBackupsInput, ListTablesError,
ListTablesInput, QueryError, QueryInput, ScanError, ScanInput,
};
use futures::{stream, Stream, TryStreamExt};
#[cfg(feature = "default")]
use rusoto_core_default::RusotoError;
#[cfg(feature = "rustls")]
use rusoto_core_rustls::RusotoError;
use std::{collections::HashMap, pin::Pin};
type DynomiteStream<I, E> = Pin<Box<dyn Stream<Item = Result<I, RusotoError<E>>> + Send>>;
pub trait DynamoDbExt {
fn list_backups_pages(
self,
input: ListBackupsInput,
) -> DynomiteStream<BackupSummary, ListBackupsError>;
fn list_tables_pages(
self,
input: ListTablesInput,
) -> DynomiteStream<String, ListTablesError>;
fn query_pages(
self,
input: QueryInput,
) -> DynomiteStream<HashMap<String, AttributeValue>, QueryError>;
fn scan_pages(
self,
input: ScanInput,
) -> DynomiteStream<HashMap<String, AttributeValue>, ScanError>;
}
impl<D> DynamoDbExt for D
where
D: DynamoDb + Clone + Send + Sync + 'static,
{
fn list_backups_pages(
self,
input: ListBackupsInput,
) -> DynomiteStream<BackupSummary, ListBackupsError> {
enum PageState {
Next(Option<String>, ListBackupsInput),
End,
}
Box::pin(
stream::try_unfold(
PageState::Next(input.exclusive_start_backup_arn.clone(), input),
move |state| {
let clone = self.clone();
async move {
let (exclusive_start_backup_arn, input) = match state {
PageState::Next(start, input) => (start, input),
PageState::End => {
return Ok(None) as Result<_, RusotoError<ListBackupsError>>
}
};
let resp = clone
.list_backups(ListBackupsInput {
exclusive_start_backup_arn,
..input.clone()
})
.await?;
let next_state = match resp
.last_evaluated_backup_arn
.filter(|next| !next.is_empty())
{
Some(next) => PageState::Next(Some(next), input),
_ => PageState::End,
};
Ok(Some((
stream::iter(
resp.backup_summaries
.unwrap_or_default()
.into_iter()
.map(Ok),
),
next_state,
)))
}
},
)
.try_flatten(),
)
}
fn list_tables_pages(
self,
input: ListTablesInput,
) -> DynomiteStream<String, ListTablesError> {
enum PageState {
Next(Option<String>, ListTablesInput),
End,
}
Box::pin(
stream::try_unfold(
PageState::Next(input.exclusive_start_table_name.clone(), input),
move |state| {
let clone = self.clone();
async move {
let (exclusive_start_table_name, input) = match state {
PageState::Next(start, input) => (start, input),
PageState::End => {
return Ok(None) as Result<_, RusotoError<ListTablesError>>
}
};
let resp = clone
.list_tables(ListTablesInput {
exclusive_start_table_name,
..input.clone()
})
.await?;
let next_state = match resp
.last_evaluated_table_name
.filter(|next| !next.is_empty())
{
Some(next) => PageState::Next(Some(next), input),
_ => PageState::End,
};
Ok(Some((
stream::iter(resp.table_names.unwrap_or_default().into_iter().map(Ok)),
next_state,
)))
}
},
)
.try_flatten(),
)
}
fn query_pages(
self,
input: QueryInput,
) -> DynomiteStream<HashMap<String, AttributeValue>, QueryError> {
#[allow(clippy::large_enum_variant)]
enum PageState {
Next(Option<HashMap<String, AttributeValue>>, QueryInput),
End,
}
Box::pin(
stream::try_unfold(
PageState::Next(input.exclusive_start_key.clone(), input),
move |state| {
let clone = self.clone();
async move {
let (exclusive_start_key, input) = match state {
PageState::Next(start, input) => (start, input),
PageState::End => {
return Ok(None) as Result<_, RusotoError<QueryError>>
}
};
let resp = clone
.query(QueryInput {
exclusive_start_key,
..input.clone()
})
.await?;
let next_state =
match resp.last_evaluated_key.filter(|next| !next.is_empty()) {
Some(next) => PageState::Next(Some(next), input),
_ => PageState::End,
};
Ok(Some((
stream::iter(resp.items.unwrap_or_default().into_iter().map(Ok)),
next_state,
)))
}
},
)
.try_flatten(),
)
}
fn scan_pages(
self,
input: ScanInput,
) -> DynomiteStream<HashMap<String, AttributeValue>, ScanError> {
#[allow(clippy::large_enum_variant)]
enum PageState {
Next(Option<HashMap<String, AttributeValue>>, ScanInput),
End,
}
Box::pin(
stream::try_unfold(
PageState::Next(input.exclusive_start_key.clone(), input),
move |state| {
let clone = self.clone();
async move {
let (exclusive_start_key, input) = match state {
PageState::Next(start, input) => (start, input),
PageState::End => return Ok(None) as Result<_, RusotoError<ScanError>>,
};
let resp = clone
.scan(ScanInput {
exclusive_start_key,
..input.clone()
})
.await?;
let next_state =
match resp.last_evaluated_key.filter(|next| !next.is_empty()) {
Some(next) => PageState::Next(Some(next), input),
_ => PageState::End,
};
Ok(Some((
stream::iter(resp.items.unwrap_or_default().into_iter().map(Ok)),
next_state,
)))
}
},
)
.try_flatten(),
)
}
}