use aws_sdk_dynamodb::operation;
use aws_sdk_dynamodb::operation::delete_item::DeleteItemOutput;
use aws_sdk_dynamodb::operation::put_item::PutItemOutput;
use aws_sdk_dynamodb::operation::query::builders::QueryFluentBuilder;
use aws_sdk_dynamodb::operation::update_item::UpdateItemOutput;
use aws_sdk_dynamodb::types::{AttributeValue, ReturnConsumedCapacity, ReturnValue, Select};
use aws_smithy_types_convert::stream::PaginationStreamExt;
use futures_util::TryStreamExt;
use serde::{Serialize, de::DeserializeOwned};
use serde_dynamo::{from_item, to_item};
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::time::Duration;
use tokio_stream::Stream;
use crate::assert_not_reserved_key;
use crate::error::Error;
use crate::table::batch::{BatchReadOutput, BatchWriteOutput, batch_get, batch_write};
use crate::table::helpers::{query_builder, validation};
use crate::table::types::{CompositeKey, OutputItems, RetryConfig};
#[inline]
fn attribute_name_alias(index: usize) -> String {
format!("#n{index}")
}
pub trait DynamoTable: Serialize + DeserializeOwned + Send + Sync
where
Self::PK: fmt::Display + Clone + Send + Sync + fmt::Debug + DeserializeOwned,
Self::SK: fmt::Display + Clone + Send + Sync + fmt::Debug + DeserializeOwned,
{
type PK;
type SK;
const TABLE: &'static str;
const PARTITION_KEY: &'static str;
const SORT_KEY: Option<&'static str> = None;
const DEFAULT_PAGE_SIZE: u16 = 10;
const BATCH_RETRIES_CONFIG: RetryConfig = RetryConfig {
max_retries: 2,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_millis(2000),
};
fn partition_key_name(&self) -> &'static str {
Self::PARTITION_KEY
}
fn sort_key_name(&self) -> Option<&'static str> {
Self::SORT_KEY
}
fn partition_key(&self) -> Self::PK;
fn sort_key(&self) -> Option<Self::SK> {
None
}
fn composite_key(&self) -> CompositeKey<Self::PK, Self::SK> {
(self.partition_key(), self.sort_key())
}
fn dynamodb_client() -> impl Future<Output = &'static aws_sdk_dynamodb::Client> {
crate::dynamodb_client()
}
fn add_item(&self) -> impl Future<Output = Result<PutItemOutput, Error>> {
add_item::<Self>(self)
}
fn get_item(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
) -> impl Future<Output = Result<Option<Self>, Error>> {
get_item::<Self>(partition_key, sort_key)
}
fn query_items(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
limit: Option<u16>,
exclusive_start_key: Option<&Self::SK>,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
_query_items(
partition_key,
sort_key,
exclusive_start_key,
limit,
None,
true,
)
}
fn query_items_with_filter<U: Serialize>(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
limit: Option<u16>,
exclusive_start_key: Option<&Self::SK>,
filter_expression: String,
filter_expression_values: U,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
query_items_with_filter::<Self, U>(
partition_key,
sort_key,
exclusive_start_key,
limit,
None,
true,
filter_expression,
filter_expression_values,
)
}
fn reverse_query_items(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
limit: Option<u16>,
exclusive_start_key: Option<&Self::SK>,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
_query_items(
partition_key,
sort_key,
exclusive_start_key,
limit,
None,
false,
)
}
fn reverse_query_items_with_filter<U: Serialize>(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
limit: Option<u16>,
exclusive_start_key: Option<&Self::SK>,
filter_expression: String,
filter_expression_values: U,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
query_items_with_filter::<Self, U>(
partition_key,
sort_key,
exclusive_start_key,
limit,
None,
false,
filter_expression,
filter_expression_values,
)
}
fn query_item(partition_key: &Self::PK) -> impl Future<Output = Result<Option<Self>, Error>> {
query_item::<Self>(partition_key)
}
fn destroy_item(self) -> impl Future<Output = Result<DeleteItemOutput, Error>> {
let partition_key = self.partition_key();
let sort_key = self.sort_key();
delete_item::<Self>(partition_key, sort_key)
}
fn delete_item(
partition_key: Self::PK,
sort_key: Option<Self::SK>,
) -> impl Future<Output = Result<DeleteItemOutput, Error>> {
delete_item::<Self>(partition_key, sort_key)
}
fn update_item<T: Serialize + Send>(
&self,
update: T,
) -> impl Future<Output = Result<UpdateItemOutput, Error>> {
let partition_key = self.partition_key();
let sort_key = self.sort_key();
update_item::<Self, T>(partition_key, sort_key, update)
}
fn count_items(partition_key: &Self::PK) -> impl Future<Output = Result<usize, Error>> {
count_items::<Self>(partition_key)
}
fn scan_items(
limit: Option<u16>,
exclusive_start_key: Option<CompositeKey<Self::PK, Self::SK>>,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
scan_items::<Self>(limit, exclusive_start_key)
}
fn scan_items_with_filter<U: Serialize>(
limit: Option<u16>,
exclusive_start_key: Option<CompositeKey<Self::PK, Self::SK>>,
filter_expression: String,
filter_expression_values: U,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
scan_items_with_filter::<Self, U>(
limit,
exclusive_start_key,
filter_expression,
filter_expression_values,
)
}
fn increment_multiple(
partition_key: &Self::PK,
sort_key: Option<&Self::SK>,
fields: &[(&str, u64)],
) -> impl Future<Output = Result<UpdateItemOutput, Error>> {
increment_multiple::<Self>(partition_key, sort_key, fields)
}
fn remove_attributes(
&self,
attributes: &[&str],
) -> impl Future<Output = Result<UpdateItemOutput, Error>> {
let partition_key = self.partition_key();
let sort_key = self.sort_key();
remove_attributes::<Self>(partition_key, sort_key, attributes)
}
fn update_item_with_condition<U: Serialize + Send, C: Serialize>(
&self,
update: U,
condition_expression: Option<String>,
condition_expression_values: Option<C>,
) -> impl Future<Output = Result<UpdateItemOutput, Error>> {
let partition_key = self.partition_key();
let sort_key = self.sort_key();
update_item_with_condition::<Self, U, C>(
partition_key,
sort_key,
update,
condition_expression,
condition_expression_values,
)
}
fn batch_upsert(
upsert: Vec<Self>,
) -> impl Future<Output = Result<BatchWriteOutput<Self>, Error>>
where
Self: Clone,
{
batch_write(upsert, vec![])
}
fn batch_delete(
delete: Vec<Self>,
) -> impl Future<Output = Result<BatchWriteOutput<Self>, Error>>
where
Self: Clone,
{
batch_write(vec![], delete)
}
fn batch_get(
values: Vec<CompositeKey<Self::PK, Self::SK>>,
) -> impl Future<Output = Result<BatchReadOutput<Self>, Error>>
where
Self::PK: DeserializeOwned,
Self::SK: DeserializeOwned,
{
batch_get::<Self>(values)
}
fn query_items_between(
partition_key: &Self::PK,
exclusive_start_key: Option<&Self::SK>,
limit: Option<u16>,
scan_index_forward: bool,
start_sort_key: String,
end_sort_key: String,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
query_items_between::<Self>(
partition_key,
exclusive_start_key,
limit,
None,
scan_index_forward,
start_sort_key,
end_sort_key,
)
}
fn query_items_contains(
partition_key: &Self::PK,
exclusive_start_key: Option<&Self::SK>,
limit: Option<u16>,
scan_index_forward: bool,
field_name: String,
field_value: String,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
query_items_contains::<Self>(
partition_key,
exclusive_start_key,
limit,
None,
scan_index_forward,
field_name,
field_value,
)
}
fn query_items_begins_with(
partition_key: &Self::PK,
exclusive_start_key: Option<&Self::SK>,
limit: Option<u16>,
scan_index_forward: bool,
sort_key_prefix: String,
) -> impl Future<Output = Result<OutputItems<Self>, Error>> {
query_items_begins_with::<Self>(
partition_key,
exclusive_start_key,
limit,
None,
scan_index_forward,
sort_key_prefix,
)
}
}
pub async fn add_item<T>(payload: &T) -> Result<PutItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
validation::validate_table_keys::<T>();
let item: HashMap<String, AttributeValue> = to_item(payload)?;
let output = T::dynamodb_client()
.await
.put_item()
.table_name(T::TABLE)
.return_values(ReturnValue::None)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.set_item(Some(item))
.send()
.await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn get_item<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
) -> Result<Option<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
debug_assert!(
!(T::SORT_KEY.is_some() && sort_key.is_none()),
"get_item argument SORT_KEY is defined but sort_key argument not given"
);
validation::validate_table_keys::<T>();
let mut builder = T::dynamodb_client()
.await
.get_item()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
if let (Some(sort_key_field), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key_field, AttributeValue::S(sort_value.to_string()));
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
if let Some(item) = result.item {
let item: T = from_item(item)?;
Ok(Some(item))
} else {
Ok(None)
}
}
pub async fn query_items<T>(
partition_key: &T::PK,
limit: Option<u16>,
exclusive_start_key: Option<&T::SK>,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
_query_items::<T>(partition_key, None, exclusive_start_key, limit, None, false).await
}
async fn _query_items_builder<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
exclusive_start_key: Option<&T::SK>,
limit: u16,
index_name: Option<String>,
scan_index_forward: bool,
) -> QueryFluentBuilder
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
validation::validate_table_keys::<T>();
let client = T::dynamodb_client().await;
let builder = if let Some(index_name) = index_name {
query_builder::QueryBuilder::for_index::<T>(index_name)
} else {
query_builder::QueryBuilder::for_table::<T>()
};
builder.build_query(
client,
partition_key.to_string(),
sort_key.map(|sk| sk.to_string()),
exclusive_start_key.map(|sk| sk.to_string()),
None,
limit,
scan_index_forward,
)
}
#[allow(clippy::too_many_arguments)]
pub async fn query_items_with_filter<T, U>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
filter_expression: String,
filter_expression_values: U,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
U: Serialize,
{
if cfg!(debug_assertions) {
validation::validate_filter_expression_values(&filter_expression_values);
}
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let filter_expression_values =
to_item::<_, HashMap<String, AttributeValue>>(filter_expression_values)?;
let mut builder = _query_items_builder::<T>(
partition_key,
sort_key,
exclusive_start_key,
limit,
index_name,
scan_index_forward,
)
.await
.filter_expression(filter_expression);
for (key, value) in filter_expression_values {
builder = builder.expression_attribute_values(key, value);
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
#[allow(clippy::too_many_arguments)]
pub async fn query_items_between<T>(
partition_key: &T::PK,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
range_start: String,
range_end: String,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let sort_key_field = T::SORT_KEY.expect("sort key required for between query");
let mut builder = T::dynamodb_client()
.await
.query()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.scan_index_forward(scan_index_forward)
.limit(limit as i32)
.key_condition_expression(format!(
"{} = :hash_value AND {sort_key_field} BETWEEN :range_start AND :range_end",
T::PARTITION_KEY
))
.expression_attribute_values(":hash_value", AttributeValue::S(partition_key.to_string()))
.expression_attribute_values(":range_start", AttributeValue::S(range_start))
.expression_attribute_values(":range_end", AttributeValue::S(range_end));
if let Some(index_name) = index_name {
builder = builder.index_name(index_name);
}
if let Some(sort_key) = exclusive_start_key {
builder = builder
.exclusive_start_key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
)
.exclusive_start_key(sort_key_field, AttributeValue::S(sort_key.to_string()));
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
#[allow(clippy::too_many_arguments)]
pub async fn query_items_contains<T>(
partition_key: &T::PK,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
field_name: String,
field_value: String,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let mut builder = T::dynamodb_client()
.await
.query()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.scan_index_forward(scan_index_forward)
.limit(limit as i32)
.key_condition_expression(format!("{} = :hash_value", T::PARTITION_KEY))
.filter_expression(format!("contains({field_name}, :v_field_name)"))
.expression_attribute_values(":hash_value", AttributeValue::S(partition_key.to_string()))
.expression_attribute_values(":v_field_name", AttributeValue::S(field_value));
if let Some(index_name) = index_name {
builder = builder.index_name(index_name);
}
if let Some(sort_key) = exclusive_start_key {
let sort_key_field = T::SORT_KEY.expect("sort key required for contains query");
builder = builder
.exclusive_start_key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
)
.exclusive_start_key(sort_key_field, AttributeValue::S(sort_key.to_string()));
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
#[allow(clippy::too_many_arguments)]
pub async fn query_items_ends_with<T>(
partition_key: &T::PK,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
prefix: String,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let sort_key_field = T::SORT_KEY.expect("sort key required for begins_with query");
let mut builder = T::dynamodb_client()
.await
.query()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.scan_index_forward(scan_index_forward)
.limit(limit as i32)
.key_condition_expression(format!(
"{} = :hash_value AND begins_with({sort_key_field}, :sort_prefix)",
T::PARTITION_KEY,
))
.expression_attribute_values(":hash_value", AttributeValue::S(partition_key.to_string()))
.expression_attribute_values(":sort_prefix", AttributeValue::S(prefix));
if let Some(index_name) = index_name {
builder = builder.index_name(index_name);
}
if let Some(sort_key) = exclusive_start_key {
builder = builder
.exclusive_start_key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
)
.exclusive_start_key(sort_key_field, AttributeValue::S(sort_key.to_string()));
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
#[allow(clippy::too_many_arguments)]
pub async fn query_items_begins_with<T>(
partition_key: &T::PK,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
prefix: String,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let sort_key_field = T::SORT_KEY.expect("sort key required for begins_with query");
let mut builder = T::dynamodb_client()
.await
.query()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.scan_index_forward(scan_index_forward)
.limit(limit as i32)
.key_condition_expression(format!(
"{} = :hash_value AND begins_with({sort_key_field}, :sort_prefix)",
T::PARTITION_KEY,
))
.expression_attribute_values(":hash_value", AttributeValue::S(partition_key.to_string()))
.expression_attribute_values(":sort_prefix", AttributeValue::S(prefix));
if let Some(index_name) = index_name {
builder = builder.index_name(index_name);
}
if let Some(sort_key) = exclusive_start_key {
builder = builder
.exclusive_start_key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
)
.exclusive_start_key(sort_key_field, AttributeValue::S(sort_key.to_string()));
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
async fn _query_items<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let result = _query_items_builder::<T>(
partition_key,
sort_key,
exclusive_start_key,
limit,
index_name,
scan_index_forward,
)
.await
.send()
.await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
pub async fn query_item<T>(partition_key: &T::PK) -> Result<Option<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
_query_items::<T>(partition_key, None, None, Some(1), None, false)
.await
.map(|mut output| output.items.pop())
}
pub async fn delete_item<T>(
partition_key: T::PK,
sort_key: Option<T::SK>,
) -> Result<DeleteItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
debug_assert!(
!(T::SORT_KEY.is_some() && sort_key.is_none()),
"delete_item argument SORT_KEY is defined but sort_key argument not given"
);
validation::validate_table_keys::<T>();
let mut builder = T::dynamodb_client()
.await
.delete_item()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
if let (Some(sort_key), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key, AttributeValue::S(sort_value.to_string()));
}
let output = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn increment<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
field: &str,
increment_by: u64,
) -> Result<UpdateItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
increment_multiple::<T>(partition_key, sort_key, &[(field, increment_by)]).await
}
pub async fn increment_multiple<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
fields: &[(&str, u64)],
) -> Result<UpdateItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if fields.is_empty() {
return Ok(UpdateItemOutput::builder().build());
}
debug_assert!(
!(T::SORT_KEY.is_some() && sort_key.is_none()),
"increment argument SORT_KEY is defined but sort_key argument not given"
);
validation::validate_table_keys::<T>();
if cfg!(debug_assertions) {
let field_names: Vec<&str> = fields.iter().map(|f| f.0).collect();
let aliases: Vec<String> = (0..field_names.len()).map(attribute_name_alias).collect();
validation::validate_aliased_field_names(&field_names, &aliases);
}
let mut builder = T::dynamodb_client()
.await
.update_item()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.set_return_values(Some(ReturnValue::None))
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
let mut update_expressions: Vec<String> = Vec::with_capacity(fields.len());
for (index, field) in fields.iter().enumerate() {
let name_alias = attribute_name_alias(index);
update_expressions.push(format!("{name_alias} = {name_alias} + :incr{index}"));
builder = builder
.expression_attribute_names(&name_alias, field.0)
.expression_attribute_values(
format!(":incr{index}"),
AttributeValue::N(format!("{}", field.1)),
);
}
builder = builder.update_expression(format!("SET {}", update_expressions.join(",")));
if let (Some(sort_key), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key, AttributeValue::S(sort_value.to_string()));
}
let output = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn update_item<T, U>(
partition_key: T::PK,
sort_key: Option<T::SK>,
update: U,
) -> Result<UpdateItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
U: Serialize + Send,
{
debug_assert!(
!(T::SORT_KEY.is_some() && sort_key.is_none()),
"update_item argument SORT_KEY is defined but sort_key argument not given"
);
let item = to_item::<_, HashMap<String, AttributeValue>>(update)?;
if cfg!(debug_assertions) {
assert_not_reserved_key(T::PARTITION_KEY);
assert_not_reserved_key(T::SORT_KEY.unwrap_or_default());
assert!(!item.is_empty());
let field_names: Vec<&str> = item.keys().map(|k| k.as_str()).collect();
let aliases: Vec<String> = (0..field_names.len()).map(attribute_name_alias).collect();
validation::validate_aliased_field_names(&field_names, &aliases);
}
let mut update_expressions: Vec<String> = Vec::with_capacity(item.len());
let mut builder = T::dynamodb_client()
.await
.update_item()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.set_return_values(Some(ReturnValue::None))
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
for (index, (k, v)) in item.into_iter().enumerate() {
let name_alias = attribute_name_alias(index);
let val = format!(":val{index}");
update_expressions.push(format!("{name_alias} = {val}"));
builder = builder
.expression_attribute_names(name_alias, k)
.expression_attribute_values(val, v);
}
builder = builder.update_expression(format!("SET {}", update_expressions.join(",")));
if let (Some(sort_key), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key, AttributeValue::S(sort_value.to_string()));
}
let output = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn update_item_with_condition<T, U, C>(
partition_key: T::PK,
sort_key: Option<T::SK>,
update: U,
condition_expression: Option<String>,
condition_expression_values: Option<C>,
) -> Result<UpdateItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
U: Serialize + Send,
C: Serialize,
{
debug_assert!(
!(T::SORT_KEY.is_some() && sort_key.is_none()),
"update_item_with_condition argument SORT_KEY is defined but sort_key argument not given"
);
let item = to_item::<_, HashMap<String, AttributeValue>>(update)?;
if cfg!(debug_assertions) {
assert_not_reserved_key(T::PARTITION_KEY);
assert_not_reserved_key(T::SORT_KEY.unwrap_or_default());
assert!(!item.is_empty());
let field_names: Vec<&str> = item.keys().map(|k| k.as_str()).collect();
let aliases: Vec<String> = (0..field_names.len()).map(attribute_name_alias).collect();
validation::validate_aliased_field_names(&field_names, &aliases);
}
let mut update_expressions: Vec<String> = Vec::with_capacity(item.len());
let mut builder = T::dynamodb_client()
.await
.update_item()
.table_name(T::TABLE)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.set_return_values(Some(ReturnValue::None))
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
for (index, (k, v)) in item.into_iter().enumerate() {
let name_alias = attribute_name_alias(index);
let val = format!(":val{index}");
update_expressions.push(format!("{name_alias} = {val}"));
builder = builder
.expression_attribute_names(name_alias, k)
.expression_attribute_values(val, v);
}
builder = builder.update_expression(format!("SET {}", update_expressions.join(",")));
if let (Some(sort_key_field), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key_field, AttributeValue::S(sort_value.to_string()));
}
if let Some(cond) = condition_expression {
builder = builder.condition_expression(cond);
if let Some(values) = condition_expression_values {
let values = to_item::<_, HashMap<String, AttributeValue>>(values)?;
for (k, v) in values {
builder = builder.expression_attribute_values(k, v);
}
}
}
let output = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn count_items<T>(partition_key: &T::PK) -> Result<usize, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
validation::validate_table_keys::<T>();
let client = T::dynamodb_client().await;
let builder = query_builder::QueryBuilder::for_table::<T>();
let result = builder
.build_count_query(client, partition_key.to_string())
.send()
.await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(result.count as usize)
}
async fn _scan_items_builder<T>(
exclusive_start_key: Option<CompositeKey<T::PK, T::SK>>,
) -> operation::scan::builders::ScanFluentBuilder
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
validation::validate_table_keys::<T>();
let mut builder = T::dynamodb_client()
.await
.scan()
.table_name(T::TABLE)
.select(Select::AllAttributes)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
});
if let Some((partition_key, sort_key)) = exclusive_start_key {
builder = builder.exclusive_start_key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
);
if let (Some(sort_key_field), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder
.exclusive_start_key(sort_key_field, AttributeValue::S(sort_value.to_string()));
}
}
builder
}
pub async fn scan_items<T>(
limit: Option<u16>,
exclusive_start_key: Option<CompositeKey<T::PK, T::SK>>,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let result = _scan_items_builder::<T>(exclusive_start_key)
.await
.limit(limit as i32)
.send()
.await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
pub async fn scan_items_with_filter<T, U>(
limit: Option<u16>,
exclusive_start_key: Option<CompositeKey<T::PK, T::SK>>,
filter_expression: String,
filter_expression_values: U,
) -> Result<OutputItems<T>, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
U: Serialize,
{
if cfg!(debug_assertions) {
validation::validate_filter_expression_values(&filter_expression_values);
}
if limit.map(|l| l == 0).unwrap_or(false) {
return Ok(OutputItems::default());
}
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
let filter_expression_values =
to_item::<_, HashMap<String, AttributeValue>>(filter_expression_values)?;
let mut builder = _scan_items_builder::<T>(exclusive_start_key)
.await
.filter_expression(filter_expression)
.limit(limit as i32);
for (key, value) in filter_expression_values {
builder = builder.expression_attribute_values(key, value);
}
let result = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(result.consumed_capacity.as_ref());
Ok(OutputItems::from((result, limit)))
}
pub async fn remove_attributes<T>(
partition_key: T::PK,
sort_key: Option<T::SK>,
attributes: &[&str],
) -> Result<UpdateItemOutput, Error>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
assert!(!attributes.is_empty(), "attributes list must not be empty");
let remove_expr = format!("REMOVE {}", attributes.join(", "));
let mut builder = T::dynamodb_client()
.await
.update_item()
.table_name(T::TABLE)
.key(
T::PARTITION_KEY,
AttributeValue::S(partition_key.to_string()),
)
.update_expression(remove_expr)
.return_consumed_capacity(if cfg!(feature = "consumed_capacity_stats") {
ReturnConsumedCapacity::Total
} else {
ReturnConsumedCapacity::None
})
.set_return_values(Some(ReturnValue::None));
if let (Some(sort_key_field), Some(sort_value)) = (T::SORT_KEY, sort_key) {
builder = builder.key(sort_key_field, AttributeValue::S(sort_value.to_string()));
}
let output = builder.send().await?;
#[cfg(feature = "consumed_capacity_stats")]
crate::consumed_capacity::stats::record_from_option(output.consumed_capacity.as_ref());
Ok(output)
}
pub async fn query_items_stream<T>(
partition_key: &T::PK,
sort_key: Option<&T::SK>,
exclusive_start_key: Option<&T::SK>,
limit: Option<u16>,
index_name: Option<String>,
scan_index_forward: bool,
) -> impl Stream<Item = Result<T, Error>>
where
T: DynamoTable,
T::PK: fmt::Display + Clone + Send + Sync + fmt::Debug,
T::SK: fmt::Display + Clone + Send + Sync + fmt::Debug,
{
let limit = limit.unwrap_or(T::DEFAULT_PAGE_SIZE);
_query_items_builder::<T>(
partition_key,
sort_key,
exclusive_start_key,
limit,
index_name,
scan_index_forward,
)
.await
.into_paginator()
.page_size(limit as i32)
.items()
.send()
.into_stream_03x()
.map_err(Into::into)
.and_then(|item| async { from_item(item).map_err(Into::into) })
}