DynamoDB Table Abstraction
A high-level, type-safe DynamoDB table abstraction for Rust with support for batch operations, pagination, Global Secondary Indexes, and more.
Features
- Type-safe: Leverage Rust's type system with
serde for automatic serialization
- Async-first: Built on
tokio and aws-sdk-dynamodb
- Auto-initialization: Client automatically initializes with sensible defaults on first use
- Batch operations: Efficiently process multiple items with automatic batching and retry logic
- Streaming: Handle large result sets with async streams
- Pagination: Built-in cursor-based pagination support
- Reserved word validation: Debug-mode checks for DynamoDB reserved words
- GSI support: Query and scan Global Secondary Indexes
- Optimistic locking: Conditional expression support for safe concurrent updates
- Automatic retries: Exponential backoff with adaptive retry mode
- Simple initialization: Global client pattern with easy setup or auto-initialization
Installation
Add to your Cargo.toml:
[dependencies]
dynamo_table = "0.1"
aws-config = "1"
aws-sdk-dynamodb = "1"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
Quick Start
Option 1: Auto-Initialization (Recommended for Getting Started)
The client automatically initializes with sensible defaults on first use:
use dynamo_table::DynamoTable;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
user_id: String,
email: String,
name: String,
}
impl DynamoTable for User {
type PK = String;
type SK = String;
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = None;
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
}
#[tokio::main]
async fn main() -> Result<(), dynamo_table::Error> {
let user = User {
user_id: "user123".to_string(),
email: "user@example.com".to_string(),
name: "John Doe".to_string(),
};
user.add_item().await?;
let retrieved = User::get_item(&"user123".to_string(), None).await?;
println!("Retrieved: {:?}", retrieved);
Ok(())
}
Auto-initialization provides:
- Adaptive retry mode with 3 max attempts
- Exponential backoff starting at 1 second
- Connect timeout: 3 seconds
- Read timeout: 20 seconds
- Operation timeout: 60 seconds
- LocalStack support via
AWS_PROFILE=localstack environment variable
Option 2: Custom Initialization
For production applications, customize the client configuration:
use dynamo_table::{init, defaults, BehaviorVersion, Region};
#[tokio::main]
async fn main() {
let config = defaults(BehaviorVersion::latest())
.region(Region::new("us-west-2"))
.load()
.await;
init(&config).await;
}
Or with a custom client:
use dynamo_table::init_with_client;
#[tokio::main]
async fn main() {
let config = aws_config::load_from_env().await;
let client = aws_sdk_dynamodb::Client::new(&config);
init_with_client(client).await;
}
Core Concepts
1. Define Your Table
use dynamo_table::DynamoTable;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
user_id: String,
email: String,
name: String,
created_at: i64,
}
impl DynamoTable for User {
type PK = String; type SK = String;
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = None;
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
}
2. Basic CRUD Operations
use dynamo_table::Error;
async fn crud_examples() -> Result<(), Error> {
let user = User {
user_id: "user123".to_string(),
email: "user@example.com".to_string(),
name: "John Doe".to_string(),
created_at: 1234567890,
};
user.add_item().await?;
let retrieved = User::get_item(&"user123".to_string(), None).await?;
if let Some(user) = retrieved {
println!("Found user: {:?}", user);
}
use serde_json::json;
let updates = json!({
"name": "Jane Doe",
"email": "jane@example.com"
});
user.update_item(updates).await?;
User::delete_item("user123".to_string(), None).await?;
user.destroy_item().await?;
Ok(())
}
3. Querying Data
async fn query_examples() -> Result<(), Error> {
let result = User::query_items(
&"user123".to_string(),
None, Some(10), None, ).await?;
for user in result.items {
println!("User: {:?}", user);
}
if let Some(cursor) = result.last_evaluated_key {
println!("More results available, cursor: {:?}", cursor);
}
let single = User::query_item(&"user123".to_string()).await?;
let count = User::count_items(&"user123".to_string()).await?;
println!("Found {} users", count);
Ok(())
}
4. Composite Keys (Partition + Sort Key)
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Order {
user_id: String,
order_id: String,
total: f64,
status: String,
created_at: String,
}
impl DynamoTable for Order {
type PK = String;
type SK = String;
const TABLE: &'static str = "orders";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = Some("order_id");
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
fn sort_key(&self) -> Option<Self::SK> {
Some(self.order_id.clone())
}
}
async fn composite_key_examples() -> Result<(), Error> {
let orders = Order::query_items(
&"user123".to_string(),
None,
Some(20),
None,
).await?;
let order = Order::get_item(
&"user123".to_string(),
Some(&"order456".to_string()),
).await?;
let orders_2024 = Order::query_begins_with(
&"user123".to_string(),
"2024-",
Some(10),
None,
true, ).await?;
let orders_range = Order::query_between(
&"user123".to_string(),
"order100",
"order200",
Some(50),
None,
true,
).await?;
let recent_orders = Order::reverse_query_items(
&"user123".to_string(),
None,
Some(5), None,
).await?;
Ok(())
}
Advanced Features
Global Secondary Indexes (GSI)
Use GSI to query your data by alternate keys:
use dynamo_table::GSITable;
impl GSITable for User {
const GSI_PARTITION_KEY: &'static str = "email";
const GSI_SORT_KEY: Option<&'static str> = None;
fn gsi_partition_key(&self) -> String {
self.email.clone()
}
fn gsi_sort_key(&self) -> Option<String> {
None
}
}
async fn gsi_examples() -> Result<(), Error> {
let result = User::query_gsi_items(
"user@example.com".to_string(),
None,
Some(10),
None,
).await?;
let user = User::query_gsi_item(
"user@example.com".to_string(),
None,
).await?;
let count = User::count_gsi_items("user@example.com".to_string()).await?;
let results = User::reverse_query_gsi_items(
"user@example.com".to_string(),
None,
Some(10),
None,
).await?;
Ok(())
}
Batch Operations
Efficiently process multiple items in a single request:
async fn batch_examples() -> Result<(), Error> {
let keys = vec![
("user1".to_string(), None),
("user2".to_string(), None),
("user3".to_string(), None),
];
let result = User::batch_get(keys).await?;
println!("Retrieved {} items", result.items.len());
if !result.failed_keys.is_empty() {
println!("Failed to retrieve: {:?}", result.failed_keys);
}
let new_users = vec![user1, user2, user3];
let write_result = User::batch_upsert(new_users).await?;
println!("Wrote {} items in {:?}",
write_result.items.len(),
write_result.execution_time
);
let users_to_delete = vec![user1, user2];
let delete_result = User::batch_delete(users_to_delete).await?;
let batch_result = batch_write(
vec![user_to_write1, user_to_write2], vec![user_to_delete1, user_to_delete2], ).await?;
Ok(())
}
Streaming Large Result Sets
Use streams to process large datasets without loading everything into memory:
use futures_util::StreamExt;
async fn streaming_examples() -> Result<(), Error> {
let stream = User::query_stream(
&"user123".to_string(),
None,
Some(100), );
tokio::pin!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(user) => {
println!("Processing user: {:?}", user);
count += 1;
}
Err(e) => eprintln!("Error: {}", e),
}
}
println!("Processed {} users total", count);
let stream_with_filter = User::query_stream_with_filter(
&"user123".to_string(),
None,
Some(50),
"status = :active".to_string(),
json!({ ":active": "active" }),
);
Ok(())
}
Filter Expressions
Apply filters to query results:
use serde_json::json;
async fn filter_examples() -> Result<(), Error> {
let active_users = User::query_items_with_filter(
&"user123".to_string(),
None,
Some(50),
None,
"status = :status AND created_at > :timestamp".to_string(),
json!({
":status": "active",
":timestamp": 1700000000
}),
).await?;
let premium_users = User::scan_items_with_filter(
Some(100),
None,
"subscription_tier = :tier".to_string(),
json!({ ":tier": "premium" }),
).await?;
let filtered_gsi = User::query_gsi_items_with_filter(
"user@example.com".to_string(),
None,
None,
Some(20),
true,
"account_status = :status".to_string(),
json!({ ":status": "verified" }),
).await?;
Ok(())
}
Conditional Updates (Optimistic Locking)
Prevent race conditions with conditional expressions:
use serde_json::json;
async fn conditional_update_examples() -> Result<(), Error> {
let updates = json!({
"status": "active",
"updated_at": 1234567890
});
let result = user.update_item_with_condition(
updates,
Some("attribute_exists(user_id) AND #status = :old_status".to_string()),
Some(json!({ ":old_status": "pending" })),
).await;
match result {
Ok(_) => println!("Updated successfully"),
Err(e) if e.is_conditional_check_failed() => {
println!("Condition failed - concurrent modification or wrong state");
}
Err(e) => return Err(e),
}
let new_user = User {
user_id: "newuser".to_string(),
email: "new@example.com".to_string(),
name: "New User".to_string(),
};
match new_user.add_item_with_condition(
Some("attribute_not_exists(user_id)".to_string()),
None,
).await {
Ok(_) => println!("Created new user"),
Err(e) if e.is_conditional_check_failed() => {
println!("User already exists");
}
Err(e) => return Err(e),
}
Ok(())
}
Atomic Counters
Increment numeric fields atomically:
async fn counter_examples() -> Result<(), Error> {
User::increment_multiple(
&"user123".to_string(),
None,
&[("login_count", 1)],
).await?;
User::increment_multiple(
&"user123".to_string(),
None,
&[
("login_count", 1),
("points", 10),
("streak", 1),
],
).await?;
Ok(())
}
Scanning Tables
Use scans sparingly for full table operations:
async fn scan_examples() -> Result<(), Error> {
let result = User::scan_items(Some(100), None).await?;
for user in result.items {
println!("User: {:?}", user);
}
let mut cursor = None;
loop {
let result = User::scan_items(Some(50), cursor).await?;
for user in result.items {
println!("Processing: {:?}", user);
}
match result.last_evaluated_key {
Some(key) => cursor = Some(key),
None => break,
}
}
Ok(())
}
Configuration
Table-Specific Configuration
Override defaults for individual tables:
use std::time::Duration;
use dynamo_table::RetryConfig;
impl DynamoTable for User {
type PK = String;
type SK = String;
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
const DEFAULT_PAGE_SIZE: u16 = 50;
const BATCH_RETRIES_CONFIG: RetryConfig = RetryConfig {
max_retries: 3,
initial_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(5),
};
}
Custom Client Configuration
use dynamo_table::{RetryConfig, RetryMode, TimeoutConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timeout_config = TimeoutConfig::builder()
.connect_timeout(Duration::from_secs(5))
.read_timeout(Duration::from_secs(30))
.operation_timeout(Duration::from_secs(90))
.build();
let retry_config = RetryConfig::standard()
.with_max_attempts(5)
.with_initial_backoff(Duration::from_millis(500));
let config = dynamo_table::defaults(dynamo_table::BehaviorVersion::latest())
.region(dynamo_table::Region::new("us-east-1"))
.retry_config(retry_config)
.timeout_config(timeout_config)
.load()
.await;
dynamo_table::init(&config).await;
}
Testing
LocalStack Support
The library automatically detects LocalStack when AWS_PROFILE=localstack:
export AWS_PROFILE=localstack
cargo test
Custom Test Client
#[cfg(test)]
mod tests {
use super::*;
async fn test_client() {
let config = aws_config::from_env()
.endpoint_url("http://localhost:4566")
.load()
.await;
let client = aws_sdk_dynamodb::Client::new(&config);
dynamo_table::init_with_client(client).await;
}
#[tokio::test]
async fn test_user_operations() {
test_client().await;
let user = User {
user_id: "test_user".to_string(),
email: "test@example.com".to_string(),
name: "Test User".to_string(),
};
user.add_item().await.unwrap();
let retrieved = User::get_item(&user.user_id, None).await.unwrap();
assert!(retrieved.is_some());
}
}
Error Handling
The library provides a comprehensive Error type with helper methods:
use dynamo_table::Error;
async fn error_handling_examples() -> Result<(), Error> {
match User::update_item(updates).await {
Ok(output) => println!("Success: {:?}", output),
Err(e) if e.is_conditional_check_failed() => {
println!("Concurrent modification detected");
}
Err(e) if e.is_serialization_error() => {
eprintln!("Data serialization failed: {}", e);
}
Err(e) if e.is_dynamodb_error() => {
eprintln!("DynamoDB error: {}", e);
}
Err(e) => return Err(e),
}
Ok(())
}
Performance Tips
- Use batch operations for multiple items to reduce API calls and costs
- Enable streams for large result sets to avoid memory issues
- Configure appropriate page sizes based on your item size (default is 10)
- Use GSIs for alternative query patterns instead of scans
- Leverage conditional expressions to avoid race conditions
- Use projection expressions to retrieve only needed attributes
- Monitor consumed capacity during development to optimize costs
- Avoid scans when possible - prefer query or batch get operations
API Reference
DynamoTable Trait Methods
Basic Operations:
add_item() - Put an item into the table
get_item(pk, sk) - Retrieve a single item
delete_item(pk, sk) - Delete an item by key
destroy_item(self) - Delete using the item itself
update_item(updates) - Update an item
update_item_with_condition(...) - Conditional update
Query Operations:
query_items(pk, sk, limit, cursor) - Query items by partition key
query_item(pk) - Query single item
reverse_query_items(...) - Query in descending order
query_items_with_filter(...) - Query with filter expression
query_stream(...) - Stream query results
query_begins_with(...) - Query with sort key prefix
query_between(...) - Query sort key range
count_items(pk) - Count items for partition key
Batch Operations:
batch_get(keys) - Batch get multiple items
batch_upsert(items) - Batch write multiple items
batch_delete(items) - Batch delete multiple items
Scan Operations:
scan_items(limit, cursor) - Scan the table
scan_items_with_filter(...) - Scan with filter
Utility:
increment_multiple(pk, sk, fields) - Atomic counter operations
dynamodb_client() - Get client for this table (can be overridden)
GSITable Trait Methods
query_gsi_items(...) - Query using Global Secondary Index
query_gsi_item(...) - Query single item by GSI
reverse_query_gsi_items(...) - Reverse query on GSI
query_gsi_items_with_filter(...) - Query GSI with filter
count_gsi_items(...) - Count items by GSI key
Examples
See the examples/ directory for more comprehensive examples:
batch_write_with_retry.rs - Advanced batch operation patterns
License
Licensed under either of:
at your option.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Acknowledgments
This library was extracted from a production application's storage layer and represents battle-tested patterns for DynamoDB access in Rust. It emphasizes type safety, ergonomics, and production-ready defaults while maintaining flexibility for custom configurations.