use super::dynamodb_utils::{retry_batch_operation, MAX_RETRIES};
use super::error::{StorageError, StorageResult};
use super::traits::{KvStore, NamespacedStore};
use crate::retry_operation;
use async_trait::async_trait;
use aws_sdk_dynamodb::types::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
ScalarAttributeType, TableStatus,
};
use aws_sdk_dynamodb::Client;
use std::collections::HashMap;
use std::sync::Arc;
pub struct DynamoDbKvStore {
client: Arc<Client>,
table_name: String,
user_id: String,
}
impl DynamoDbKvStore {
pub fn new(client: Arc<Client>, table_name: String, user_id: String) -> Self {
Self {
client,
table_name,
user_id,
}
}
#[cfg(test)]
pub fn get_partition_key(&self) -> String {
self.get_partition_key_impl()
}
fn get_partition_key_impl(&self) -> String {
self.user_id.clone()
}
fn get_partition_key_with_key(&self, _key: &[u8]) -> String {
self.user_id.clone()
}
#[cfg(test)]
pub fn make_sort_key(&self, key: &[u8]) -> String {
self.make_sort_key_impl(key)
}
fn make_sort_key_impl(&self, key: &[u8]) -> String {
String::from_utf8_lossy(key).to_string()
}
}
#[async_trait]
impl KvStore for DynamoDbKvStore {
async fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(key);
let key_str = String::from_utf8_lossy(key);
let result = retry_operation!(
self.client
.get_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk.clone()))
.key("SK", AttributeValue::S(sk.clone()))
.send(),
"get_item",
&self.table_name,
Some(&key_str),
MAX_RETRIES,
StorageError::DynamoDbError
)?;
if let Some(item) = result.item {
if let Some(AttributeValue::S(json_str)) = item.get("Value") {
return Ok(Some(json_str.as_bytes().to_vec()));
}
}
Ok(None)
}
async fn put(&self, key: &[u8], value: Vec<u8>) -> StorageResult<()> {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(key);
let key_str = String::from_utf8_lossy(key);
let json_str = String::from_utf8(value.clone()).map_err(|e| {
StorageError::SerializationError(format!("Invalid UTF-8 in value: {}", e))
})?;
retry_operation!(
self.client
.put_item()
.table_name(&self.table_name)
.item("PK", AttributeValue::S(pk.clone()))
.item("SK", AttributeValue::S(sk.clone()))
.item("Value", AttributeValue::S(json_str.clone()))
.send(),
"put_item",
&self.table_name,
Some(&key_str),
MAX_RETRIES,
StorageError::DynamoDbError
)?;
Ok(())
}
async fn delete(&self, key: &[u8]) -> StorageResult<bool> {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(key);
let key_str = String::from_utf8_lossy(key);
let result = retry_operation!(
self.client
.delete_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk.clone()))
.key("SK", AttributeValue::S(sk.clone()))
.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld)
.send(),
"delete_item",
&self.table_name,
Some(&key_str),
MAX_RETRIES,
StorageError::DynamoDbError
)?;
Ok(result.attributes.is_some())
}
async fn exists(&self, key: &[u8]) -> StorageResult<bool> {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(key);
let key_str = String::from_utf8_lossy(key);
let result = retry_operation!(
self.client
.get_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk.clone()))
.key("SK", AttributeValue::S(sk.clone()))
.projection_expression("PK") .send(),
"get_item",
&self.table_name,
Some(&key_str),
MAX_RETRIES,
StorageError::DynamoDbError
)?;
Ok(result.item.is_some())
}
async fn scan_prefix(&self, prefix: &[u8]) -> StorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
let prefix_str = String::from_utf8_lossy(prefix).to_string();
let pk = self.get_partition_key_impl();
let mut results = Vec::new();
let mut last_evaluated_key: Option<HashMap<String, AttributeValue>> = None;
loop {
let mut query = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("PK = :pk AND begins_with(SK, :prefix)")
.expression_attribute_values(":pk", AttributeValue::S(pk.clone()))
.expression_attribute_values(":prefix", AttributeValue::S(prefix_str.clone()));
if let Some(key) = last_evaluated_key.take() {
query = query.set_exclusive_start_key(Some(key));
}
let response = match query.send().await {
Ok(r) => r,
Err(e) => {
let error_str = e.to_string();
if error_str.contains("ResourceNotFoundException")
|| error_str.contains("ResourceInUseException")
|| error_str.contains("cannot do operations on a non-existent table")
{
return Ok(Vec::new());
}
return Err(StorageError::DynamoDbError(error_str));
}
};
if let Some(items) = response.items {
for item in items {
if let (Some(AttributeValue::S(sk)), Some(AttributeValue::S(json_str))) =
(item.get("SK"), item.get("Value"))
{
results.push((sk.as_bytes().to_vec(), json_str.as_bytes().to_vec()));
}
}
}
last_evaluated_key = response.last_evaluated_key;
if last_evaluated_key.is_none() {
break;
}
}
Ok(results)
}
async fn batch_put(&self, items: Vec<(Vec<u8>, Vec<u8>)>) -> StorageResult<()> {
const BATCH_SIZE: usize = 25;
for chunk in items.chunks(BATCH_SIZE) {
let mut write_requests = Vec::new();
for (key, value) in chunk {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(key);
let mut item = HashMap::new();
let json_str = String::from_utf8(value.clone()).map_err(|e| {
StorageError::SerializationError(format!("Invalid UTF-8 in batch value: {}", e))
})?;
item.insert("PK".to_string(), AttributeValue::S(pk));
item.insert("SK".to_string(), AttributeValue::S(sk));
item.insert("Value".to_string(), AttributeValue::S(json_str));
write_requests.push(
aws_sdk_dynamodb::types::WriteRequest::builder()
.put_request(
aws_sdk_dynamodb::types::PutRequest::builder()
.set_item(Some(item))
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build put request for table '{}': {}",
self.table_name, e
))
})?,
)
.build(),
);
}
retry_batch_operation(
|requests| {
let mut req_map = HashMap::new();
req_map.insert(self.table_name.clone(), requests.to_vec());
Box::pin(
self.client
.batch_write_item()
.set_request_items(Some(req_map))
.send(),
)
},
&self.table_name,
write_requests,
)
.await
.map_err(StorageError::DynamoDbError)?;
}
Ok(())
}
async fn batch_delete(&self, keys: Vec<Vec<u8>>) -> StorageResult<()> {
const BATCH_SIZE: usize = 25;
for chunk in keys.chunks(BATCH_SIZE) {
let mut write_requests = Vec::new();
for key in chunk {
let pk = self.get_partition_key_with_key(key);
let sk = self.make_sort_key_impl(&key);
let mut key_map = HashMap::new();
key_map.insert("PK".to_string(), AttributeValue::S(pk));
key_map.insert("SK".to_string(), AttributeValue::S(sk));
write_requests.push(
aws_sdk_dynamodb::types::WriteRequest::builder()
.delete_request(
aws_sdk_dynamodb::types::DeleteRequest::builder()
.set_key(Some(key_map))
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build delete request for table '{}': {}",
self.table_name, e
))
})?,
)
.build(),
);
}
retry_batch_operation(
|requests| {
let mut req_map = HashMap::new();
req_map.insert(self.table_name.clone(), requests.to_vec());
Box::pin(
self.client
.batch_write_item()
.set_request_items(Some(req_map))
.send(),
)
},
&self.table_name,
write_requests,
)
.await
.map_err(StorageError::DynamoDbError)?;
}
Ok(())
}
async fn flush(&self) -> StorageResult<()> {
Ok(())
}
fn backend_name(&self) -> &'static str {
"dynamodb"
}
fn execution_model(&self) -> super::traits::ExecutionModel {
super::traits::ExecutionModel::Async
}
fn flush_behavior(&self) -> super::traits::FlushBehavior {
super::traits::FlushBehavior::NoOp
}
}
#[derive(Clone, Debug)]
pub enum TableNameResolver {
Prefix(String),
Explicit(HashMap<String, String>),
}
pub struct DynamoDbNativeIndexStore {
client: Arc<Client>,
table_name: String,
user_id: String,
}
impl DynamoDbNativeIndexStore {
fn new(client: Arc<Client>, table_name: String, user_id: String) -> Self {
Self {
client,
table_name,
user_id,
}
}
fn parse_key(&self, key: &[u8]) -> StorageResult<(String, String)> {
let key_str = String::from_utf8_lossy(key);
if let Some(colon_pos) = key_str.find(':') {
let feature = key_str[..colon_pos].to_string();
let term = key_str[colon_pos + 1..].to_string();
Ok((feature, term))
} else {
Err(StorageError::SerializationError(format!(
"Invalid key format: missing colon in '{}'",
key_str
)))
}
}
fn get_partition_key(&self, feature: &str) -> String {
format!("{}:{}", self.user_id, feature)
}
}
pub struct DynamoDbNamespacedStore {
client: Arc<Client>,
resolver: TableNameResolver,
auto_create: bool,
user_id: String,
}
impl DynamoDbNamespacedStore {
pub fn new(
client: Client,
resolver: TableNameResolver,
auto_create: bool,
user_id: String,
) -> Self {
Self {
client: Arc::new(client),
resolver,
auto_create,
user_id,
}
}
pub fn new_with_prefix(client: Client, prefix: String, user_id: String) -> Self {
Self::new(client, TableNameResolver::Prefix(prefix), true, user_id)
}
pub fn with_user_id(mut self, user_id: String) -> Self {
self.user_id = user_id;
self
}
fn table_name_for_namespace(&self, namespace: &str) -> StorageResult<String> {
match &self.resolver {
TableNameResolver::Prefix(prefix) => Ok(format!("{}-{}", prefix, namespace)),
TableNameResolver::Explicit(map) => map.get(namespace).cloned().ok_or_else(|| {
StorageError::ConfigurationError(format!(
"No explicit table name configured for namespace '{}'",
namespace
))
}),
}
}
#[cfg(test)]
pub fn get_table_name_for_namespace(&self, namespace: &str) -> String {
self.table_name_for_namespace(namespace)
.unwrap_or_else(|_| "unknown".to_string())
}
async fn ensure_table_exists(&self, table_name: &str) -> StorageResult<()> {
match self
.client
.describe_table()
.table_name(table_name)
.send()
.await
{
Ok(response) => {
if let Some(table) = response.table() {
if let Some(status) = table.table_status() {
if status == &aws_sdk_dynamodb::types::TableStatus::Active {
return Ok(());
} else {
log::debug!(
"Table {} exists but status is {:?}, waiting...",
table_name,
status
);
return Ok(());
}
}
}
return Ok(());
}
Err(e) => {
let error_str = e.to_string();
if error_str.contains("ResourceNotFoundException") {
} else if error_str.contains("service error") {
log::warn!("Got 'service error' when checking table {} - proceeding to attempt creation", table_name);
} else {
log::warn!(
"Unexpected error checking table {}: {} - proceeding anyway",
table_name,
error_str
);
}
}
}
let create_result = self
.client
.create_table()
.table_name(table_name)
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name("PK")
.attribute_type(ScalarAttributeType::S)
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build attribute definition: {}",
e
))
})?,
)
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name("SK")
.attribute_type(ScalarAttributeType::S)
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build attribute definition: {}",
e
))
})?,
)
.key_schema(
KeySchemaElement::builder()
.attribute_name("PK")
.key_type(KeyType::Hash)
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!("Failed to build key schema: {}", e))
})?,
)
.key_schema(
KeySchemaElement::builder()
.attribute_name("SK")
.key_type(KeyType::Range)
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!("Failed to build key schema: {}", e))
})?,
)
.billing_mode(BillingMode::PayPerRequest)
.send()
.await;
match create_result {
Ok(_) => {
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 30;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
match self
.client
.describe_table()
.table_name(table_name)
.send()
.await
{
Ok(response) => {
if let Some(table) = response.table {
if let Some(status) = table.table_status {
if matches!(status, TableStatus::Active) {
return Ok(());
}
}
}
}
Err(_) => {
}
}
attempts += 1;
if attempts >= MAX_ATTEMPTS {
return Err(StorageError::DynamoDbError(format!(
"Table '{}' did not become ACTIVE within timeout",
table_name
)));
}
}
}
Err(e) => {
if e.to_string().contains("ResourceInUseException") {
Ok(())
} else {
Err(StorageError::DynamoDbError(format!(
"Failed to create table {}: {}",
table_name, e
)))
}
}
}
}
}
#[async_trait]
impl KvStore for DynamoDbNativeIndexStore {
async fn get(&self, key: &[u8]) -> StorageResult<Option<Vec<u8>>> {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let result = self
.client
.get_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk))
.key("SK", AttributeValue::S(term))
.send()
.await
.map_err(|e| StorageError::DynamoDbError(e.to_string()))?;
if let Some(item) = result.item {
if let Some(AttributeValue::S(json_str)) = item.get("Value") {
return Ok(Some(json_str.as_bytes().to_vec()));
}
}
Ok(None)
}
async fn put(&self, key: &[u8], value: Vec<u8>) -> StorageResult<()> {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let json_str = String::from_utf8(value).map_err(|e| {
StorageError::SerializationError(format!("Invalid UTF-8 in value: {}", e))
})?;
self.client
.put_item()
.table_name(&self.table_name)
.item("PK", AttributeValue::S(pk))
.item("SK", AttributeValue::S(term))
.item("Value", AttributeValue::S(json_str))
.send()
.await
.map_err(|e| StorageError::DynamoDbError(e.to_string()))?;
Ok(())
}
async fn delete(&self, key: &[u8]) -> StorageResult<bool> {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let result = self
.client
.delete_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk))
.key("SK", AttributeValue::S(term))
.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld)
.send()
.await
.map_err(|e| StorageError::DynamoDbError(e.to_string()))?;
Ok(result.attributes.is_some())
}
async fn exists(&self, key: &[u8]) -> StorageResult<bool> {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let result = self
.client
.get_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(pk))
.key("SK", AttributeValue::S(term))
.projection_expression("PK")
.send()
.await
.map_err(|e| StorageError::DynamoDbError(e.to_string()))?;
Ok(result.item.is_some())
}
async fn scan_prefix(&self, prefix: &[u8]) -> StorageResult<Vec<(Vec<u8>, Vec<u8>)>> {
let prefix_str = String::from_utf8_lossy(prefix);
let (feature, term_prefix) = if let Some(colon_pos) = prefix_str.find(':') {
let feature = prefix_str[..colon_pos].to_string();
let term_prefix = prefix_str[colon_pos + 1..].to_string();
(feature, term_prefix)
} else {
return Err(StorageError::SerializationError(format!(
"Invalid prefix format: missing colon in '{}'",
prefix_str
)));
};
let pk = self.get_partition_key(&feature);
let mut results = Vec::new();
let mut last_evaluated_key: Option<HashMap<String, AttributeValue>> = None;
loop {
let mut query = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("PK = :pk AND begins_with(SK, :prefix)")
.expression_attribute_values(":pk", AttributeValue::S(pk.clone()))
.expression_attribute_values(":prefix", AttributeValue::S(term_prefix.clone()));
if let Some(key) = last_evaluated_key.take() {
query = query.set_exclusive_start_key(Some(key));
}
let response = match query.send().await {
Ok(r) => r,
Err(e) => {
let error_str = e.to_string();
if error_str.contains("ResourceNotFoundException")
|| error_str.contains("ResourceInUseException")
|| error_str.contains("cannot do operations on a non-existent table")
{
return Ok(Vec::new());
}
return Err(StorageError::DynamoDbError(error_str));
}
};
if let Some(items) = response.items {
for item in items {
if let (Some(AttributeValue::S(sk)), Some(AttributeValue::S(json_str))) =
(item.get("SK"), item.get("Value"))
{
let full_key = format!("{}:{}", feature, sk);
results.push((full_key.as_bytes().to_vec(), json_str.as_bytes().to_vec()));
}
}
}
last_evaluated_key = response.last_evaluated_key;
if last_evaluated_key.is_none() {
break;
}
}
Ok(results)
}
async fn batch_put(&self, items: Vec<(Vec<u8>, Vec<u8>)>) -> StorageResult<()> {
const BATCH_SIZE: usize = 25;
for chunk in items.chunks(BATCH_SIZE) {
let mut write_requests = Vec::new();
for (key, value) in chunk {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let json_str = String::from_utf8(value.clone()).map_err(|e| {
StorageError::SerializationError(format!("Invalid UTF-8 in batch value: {}", e))
})?;
let put_request = aws_sdk_dynamodb::types::PutRequest::builder()
.item("PK", AttributeValue::S(pk))
.item("SK", AttributeValue::S(term))
.item("Value", AttributeValue::S(json_str))
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build put request for table '{}': {}",
self.table_name, e
))
})?;
let write_request = aws_sdk_dynamodb::types::WriteRequest::builder()
.put_request(put_request)
.build();
write_requests.push(write_request);
}
retry_batch_operation(
|requests| {
Box::pin(
self.client
.batch_write_item()
.request_items(&self.table_name, requests.to_vec())
.send(),
)
},
&self.table_name,
write_requests,
)
.await
.map_err(StorageError::DynamoDbError)?;
}
Ok(())
}
async fn batch_delete(&self, keys: Vec<Vec<u8>>) -> StorageResult<()> {
const BATCH_SIZE: usize = 25;
for chunk in keys.chunks(BATCH_SIZE) {
let mut write_requests = Vec::new();
for key in chunk {
let (feature, term) = self.parse_key(key)?;
let pk = self.get_partition_key(&feature);
let delete_request = aws_sdk_dynamodb::types::DeleteRequest::builder()
.key("PK", AttributeValue::S(pk))
.key("SK", AttributeValue::S(term))
.build()
.map_err(|e| {
StorageError::DynamoDbError(format!(
"Failed to build delete request for table '{}': {}",
self.table_name, e
))
})?;
let write_request = aws_sdk_dynamodb::types::WriteRequest::builder()
.delete_request(delete_request)
.build();
write_requests.push(write_request);
}
retry_batch_operation(
|requests| {
Box::pin(
self.client
.batch_write_item()
.request_items(&self.table_name, requests.to_vec())
.send(),
)
},
&self.table_name,
write_requests,
)
.await
.map_err(StorageError::DynamoDbError)?;
}
Ok(())
}
async fn flush(&self) -> StorageResult<()> {
Ok(())
}
fn backend_name(&self) -> &'static str {
"dynamodb-native-index"
}
fn execution_model(&self) -> super::traits::ExecutionModel {
super::traits::ExecutionModel::Async
}
fn flush_behavior(&self) -> super::traits::FlushBehavior {
super::traits::FlushBehavior::NoOp
}
}
#[async_trait]
impl NamespacedStore for DynamoDbNamespacedStore {
async fn open_namespace(&self, name: &str) -> StorageResult<Arc<dyn KvStore>> {
let table_name = self.table_name_for_namespace(name)?;
if self.auto_create {
self.ensure_table_exists(&table_name).await?;
}
if name == "native_index" {
let store = DynamoDbNativeIndexStore::new(
self.client.clone(),
table_name,
self.user_id.clone(),
);
Ok(Arc::new(store))
} else {
let store = DynamoDbKvStore::new(self.client.clone(), table_name, self.user_id.clone());
Ok(Arc::new(store))
}
}
async fn list_namespaces(&self) -> StorageResult<Vec<String>> {
Err(StorageError::InvalidOperation(
"list_namespaces not implemented for DynamoDB - requires custom implementation"
.to_string(),
))
}
async fn delete_namespace(&self, _name: &str) -> StorageResult<bool> {
Err(StorageError::InvalidOperation(
"delete_namespace not implemented for DynamoDB - requires custom implementation"
.to_string(),
))
}
}
#[cfg(test)]
mod unit_tests {
use super::*;
use aws_sdk_dynamodb::config::Region;
async fn create_dummy_client() -> Arc<Client> {
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(Region::new("us-east-1"))
.load()
.await;
Arc::new(Client::new(&config))
}
#[tokio::test]
async fn test_kv_store_key_generation() {
let client = create_dummy_client().await;
let store = DynamoDbKvStore::new(
client.clone(),
"TestTable".to_string(),
"user123".to_string(),
);
let key = b"my_key";
let pk = store.get_partition_key_with_key(key);
let sk = store.make_sort_key_impl(key);
assert_eq!(pk, "user123");
assert_eq!(sk, "my_key");
let store_default = DynamoDbKvStore::new(
client.clone(),
"TestTable".to_string(),
"default".to_string(),
);
let pk_default = store_default.get_partition_key_with_key(key);
assert_eq!(pk_default, "default");
}
#[tokio::test]
async fn test_native_index_key_parsing() {
let client = create_dummy_client().await;
let store =
DynamoDbNativeIndexStore::new(client, "IndexTable".to_string(), "user123".to_string());
let (feature, term) = store.parse_key(b"word:hello").unwrap();
assert_eq!(feature, "word");
assert_eq!(term, "hello");
let (feature, term) = store.parse_key(b"email:test@example.com").unwrap();
assert_eq!(feature, "email");
assert_eq!(term, "test@example.com");
let result = store.parse_key(b"just_a_word");
assert!(result.is_err());
let (feature, term) = store.parse_key(b"word:").unwrap();
assert_eq!(feature, "word");
assert_eq!(term, "");
}
#[tokio::test]
async fn test_native_index_partition_key() {
let client = create_dummy_client().await;
let store = DynamoDbNativeIndexStore::new(
client.clone(),
"IndexTable".to_string(),
"user123".to_string(),
);
let pk = store.get_partition_key("word");
assert_eq!(pk, "user123:word");
let store_default =
DynamoDbNativeIndexStore::new(client, "IndexTable".to_string(), "default".to_string());
let pk_default = store_default.get_partition_key("email");
assert_eq!(pk_default, "default:email");
}
}