use super::*;
use aws_sdk_dynamodb::operation::query::builders::QueryFluentBuilder;
#[must_use = "builder does nothing until executed via .all() or .stream()"]
pub struct QueryRequest<
TD: TableDefinition,
T = (),
O: OutputFormat = Raw,
F: FilterState = NoFilter,
P: ProjectionState = NoProjection,
> {
builder: QueryFluentBuilder,
_marker: PhantomData<(TD, T, O, F, P)>,
}
impl<TD: TableDefinition> QueryRequest<TD> {
pub fn new(
client: aws_sdk_dynamodb::Client,
key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
) -> Self {
Self::_new(client, key_condition)
}
pub fn new_index<I: IndexDefinition<TD>>(
client: aws_sdk_dynamodb::Client,
key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
) -> Self {
Self::_new_index::<I>(client, key_condition)
}
}
impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState, P: ProjectionState>
QueryRequest<TD, T, O, F, P>
{
pub(super) fn _new(
client: aws_sdk_dynamodb::Client,
key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
) -> Self {
let table_name = TD::table_name();
tracing::debug!(table_name, %key_condition, "Query");
Self {
builder: key_condition.apply_key_condition(client.query().table_name(table_name)),
_marker: PhantomData,
}
}
pub(super) fn _new_index<I: IndexDefinition<TD>>(
client: aws_sdk_dynamodb::Client,
key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
) -> Self {
let table_name = TD::table_name();
let index_name = I::index_name();
tracing::debug!(table_name, index_name, %key_condition, "Query (index)");
Self {
builder: key_condition
.apply_key_condition(client.query().table_name(table_name).index_name(index_name)),
_marker: PhantomData,
}
}
pub fn consistent_read(mut self) -> Self {
tracing::debug!("Query consistent_read");
self.builder = self.builder.consistent_read(true);
self
}
pub fn limit(mut self, limit: i32) -> Self {
tracing::debug!(limit, "Query limit");
self.builder = self.builder.limit(limit);
self
}
pub fn reverse(mut self) -> Self {
tracing::debug!("Query scan_index_forward = false");
self.builder = self.builder.scan_index_forward(false);
self
}
pub fn into_inner(self) -> QueryFluentBuilder {
self.builder
}
}
impl<TD: TableDefinition, T, O: OutputFormat, P: ProjectionState>
QueryRequest<TD, T, O, NoFilter, P>
{
pub fn filter(mut self, filter: Condition<'_>) -> QueryRequest<TD, T, O, AlreadyHasFilter, P> {
tracing::debug!(%filter, "Query filter");
self.builder = filter.apply_filter(self.builder);
QueryRequest {
builder: self.builder,
_marker: PhantomData,
}
}
}
impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState>
QueryRequest<TD, T, O, F, NoProjection>
{
pub fn project(
mut self,
projection: Projection<'_, TD>,
) -> QueryRequest<TD, T, Raw, F, AlreadyHasProjection> {
tracing::debug!(%projection, "Query project");
self.builder = projection.apply_projection(self.builder);
QueryRequest {
builder: self.builder,
_marker: PhantomData,
}
}
}
impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Typed, F, P> {
pub fn raw(self) -> QueryRequest<TD, T, Raw, F, P> {
QueryRequest {
builder: self.builder,
_marker: PhantomData,
}
}
}
impl<
TD: TableDefinition,
T: DynamoDBItem<TD> + DeserializeOwned,
F: FilterState,
P: ProjectionState,
> QueryRequest<TD, T, Typed, F, P>
{
#[tracing::instrument(level = "debug", skip(self), name = "query_all")]
pub async fn all(self) -> Result<Vec<T>> {
dynamodb_execute_query(self.builder)
.await?
.into_iter()
.map(T::try_from_item)
.collect()
}
pub fn stream(self) -> impl Stream<Item = Result<Vec<T>>> {
dynamodb_stream_query::<TD>(self.builder).map(|result| {
result.and_then(|items| items.into_iter().map(T::try_from_item).collect())
})
}
}
impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Raw, F, P> {
#[tracing::instrument(level = "debug", skip(self), name = "query_all_raw")]
pub async fn all(self) -> Result<Vec<Item<TD>>> {
dynamodb_execute_query(self.builder).await
}
pub fn stream(self) -> impl Stream<Item = Result<Vec<Item<TD>>>> {
dynamodb_stream_query(self.builder)
}
}