use crate::core::{Data, DataService, LinkService, link::LinkEntity};
use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_dynamodb::Client as DynamoDBClient;
use aws_sdk_dynamodb::types::AttributeValue;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Clone)]
pub struct DynamoDBDataService<T: Data + serde::Serialize + for<'de> serde::Deserialize<'de>> {
client: DynamoDBClient,
table_name: String,
_phantom: std::marker::PhantomData<T>,
}
impl<T: Data + serde::Serialize + for<'de> serde::Deserialize<'de>> DynamoDBDataService<T> {
pub fn new(client: DynamoDBClient, table_name: String) -> Self {
Self {
client,
table_name,
_phantom: std::marker::PhantomData,
}
}
pub async fn list_by_tenant(&self, tenant_id: &Uuid) -> Result<Vec<T>>
where
T: Data + Send + Sync + 'static,
{
let result = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("tenant_id = :tenant_id")
.expression_attribute_values(":tenant_id", AttributeValue::S(tenant_id.to_string()))
.send()
.await?;
let mut entities = Vec::new();
if let Some(items) = result.items {
for item in items {
entities.push(self.item_to_entity(&item).await?);
}
}
Ok(entities)
}
pub async fn get_with_tenant(&self, tenant_id: &Uuid, id: &Uuid) -> Result<Option<T>>
where
T: Data + Send + Sync + 'static,
{
let key = HashMap::from([
(
"tenant_id".to_string(),
AttributeValue::S(tenant_id.to_string()),
),
("id".to_string(), AttributeValue::S(id.to_string())),
]);
let result = self
.client
.get_item()
.table_name(&self.table_name)
.set_key(Some(key))
.send()
.await?;
match result.item() {
Some(item) => Ok(Some(self.item_to_entity(item).await?)),
None => Ok(None),
}
}
pub async fn list_by_tenant_gsi(&self, tenant_id: &Uuid, index_name: &str) -> Result<Vec<T>>
where
T: Data + Send + Sync + 'static,
{
let result = self
.client
.query()
.table_name(&self.table_name)
.index_name(index_name)
.key_condition_expression("tenant_id = :tenant_id")
.expression_attribute_values(":tenant_id", AttributeValue::S(tenant_id.to_string()))
.send()
.await?;
let mut entities = Vec::new();
if let Some(items) = result.items {
for item in items {
entities.push(self.item_to_entity(&item).await?);
}
}
Ok(entities)
}
async fn entity_to_item(&self, entity: &T) -> Result<HashMap<String, AttributeValue>> {
let json = serde_json::to_value(entity)?;
let mut item = HashMap::new();
if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
}
for (key, value) in json.as_object().unwrap_or(&serde_json::Map::new()) {
if key != "id" {
if let Some(str_val) = value.as_str() {
item.insert(key.clone(), AttributeValue::S(str_val.to_string()));
} else if let Some(num_val) = value.as_f64() {
item.insert(key.clone(), AttributeValue::N(num_val.to_string()));
} else if let Some(bool_val) = value.as_bool() {
item.insert(key.clone(), AttributeValue::Bool(bool_val));
} else if let Some(arr) = value.as_array() {
let list_items: Vec<AttributeValue> = arr
.iter()
.filter_map(|v| {
if let Some(s) = v.as_str() {
Some(AttributeValue::S(s.to_string()))
} else if let Some(n) = v.as_f64() {
Some(AttributeValue::N(n.to_string()))
} else {
v.as_bool().map(AttributeValue::Bool)
}
})
.collect();
if !list_items.is_empty() {
item.insert(key.clone(), AttributeValue::L(list_items));
}
} else if value.is_null() {
} else {
item.insert(key.clone(), AttributeValue::S(value.to_string()));
}
}
}
Ok(item)
}
async fn item_to_entity(&self, item: &HashMap<String, AttributeValue>) -> Result<T> {
let mut json = serde_json::Map::new();
for (key, value) in item {
match value {
AttributeValue::S(s) => {
json.insert(key.clone(), serde_json::Value::String(s.clone()));
}
AttributeValue::N(n) => {
if let Ok(num) = n.parse::<f64>() {
json.insert(
key.clone(),
serde_json::Value::Number(serde_json::Number::from_f64(num).unwrap()),
);
}
}
AttributeValue::Bool(b) => {
json.insert(key.clone(), serde_json::Value::Bool(*b));
}
AttributeValue::L(list) => {
let json_array: Vec<serde_json::Value> = list
.iter()
.filter_map(|item| match item {
AttributeValue::S(s) => Some(serde_json::Value::String(s.clone())),
AttributeValue::N(n) => n
.parse::<f64>()
.ok()
.and_then(serde_json::Number::from_f64)
.map(serde_json::Value::Number),
AttributeValue::Bool(b) => Some(serde_json::Value::Bool(*b)),
_ => None,
})
.collect();
json.insert(key.clone(), serde_json::Value::Array(json_array));
}
_ => {
}
}
}
Ok(serde_json::from_value(serde_json::Value::Object(json))?)
}
}
#[async_trait]
impl<T: Data + serde::Serialize + for<'de> serde::Deserialize<'de>> DataService<T>
for DynamoDBDataService<T>
{
async fn create(&self, entity: T) -> Result<T> {
let item = self.entity_to_item(&entity).await?;
self.client
.put_item()
.table_name(&self.table_name)
.set_item(Some(item))
.send()
.await?;
Ok(entity)
}
async fn get(&self, id: &Uuid) -> Result<Option<T>> {
let key = HashMap::from([("id".to_string(), AttributeValue::S(id.to_string()))]);
let result = self
.client
.get_item()
.table_name(&self.table_name)
.set_key(Some(key))
.send()
.await?;
match result.item() {
Some(item) => Ok(Some(self.item_to_entity(item).await?)),
None => Ok(None),
}
}
async fn list(&self) -> Result<Vec<T>> {
let result = self
.client
.scan()
.table_name(&self.table_name)
.send()
.await?;
let mut entities = Vec::new();
if let Some(items) = result.items {
for item in items {
entities.push(self.item_to_entity(&item).await?);
}
}
Ok(entities)
}
async fn update(&self, _id: &Uuid, entity: T) -> Result<T> {
let item = self.entity_to_item(&entity).await?;
self.client
.put_item()
.table_name(&self.table_name)
.set_item(Some(item))
.send()
.await?;
Ok(entity)
}
async fn delete(&self, id: &Uuid) -> Result<()> {
let key = HashMap::from([("id".to_string(), AttributeValue::S(id.to_string()))]);
self.client
.delete_item()
.table_name(&self.table_name)
.set_key(Some(key))
.send()
.await?;
Ok(())
}
async fn search(&self, field: &str, value: &str) -> Result<Vec<T>> {
let result = self
.client
.scan()
.table_name(&self.table_name)
.filter_expression(format!("{} = :value", field))
.expression_attribute_values(":value", AttributeValue::S(value.to_string()))
.send()
.await?;
let mut entities = Vec::new();
if let Some(items) = result.items {
for item in items {
entities.push(self.item_to_entity(&item).await?);
}
}
Ok(entities)
}
}
pub struct DynamoDBLinkService {
client: DynamoDBClient,
table_name: String,
}
impl DynamoDBLinkService {
pub fn new(client: DynamoDBClient, table_name: String) -> Self {
Self { client, table_name }
}
pub async fn list_links_by_tenant(&self, tenant_id: &Uuid) -> Result<Vec<LinkEntity>> {
let result = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("tenant_id = :tenant_id")
.expression_attribute_values(":tenant_id", AttributeValue::S(tenant_id.to_string()))
.send()
.await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
pub async fn list_links_by_tenant_gsi(
&self,
tenant_id: &Uuid,
index_name: &str,
) -> Result<Vec<LinkEntity>> {
let result = self
.client
.query()
.table_name(&self.table_name)
.index_name(index_name)
.key_condition_expression("tenant_id = :tenant_id")
.expression_attribute_values(":tenant_id", AttributeValue::S(tenant_id.to_string()))
.send()
.await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
pub async fn find_by_source_and_tenant(
&self,
tenant_id: &Uuid,
source_id: &Uuid,
link_type: Option<&str>,
) -> Result<Vec<LinkEntity>> {
let mut query = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("tenant_id = :tenant_id")
.expression_attribute_values(":tenant_id", AttributeValue::S(tenant_id.to_string()));
let mut filter_parts = vec!["source_id = :source_id".to_string()];
query = query
.expression_attribute_values(":source_id", AttributeValue::S(source_id.to_string()));
if let Some(lt) = link_type {
filter_parts.push("link_type = :link_type".to_string());
query =
query.expression_attribute_values(":link_type", AttributeValue::S(lt.to_string()));
}
let filter_expression = filter_parts.join(" AND ");
let result = query.filter_expression(filter_expression).send().await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
async fn link_to_item(&self, link: &LinkEntity) -> Result<HashMap<String, AttributeValue>> {
let json = serde_json::to_value(link)?;
let mut item = HashMap::new();
for (key, value) in json.as_object().unwrap_or(&serde_json::Map::new()) {
match value {
serde_json::Value::String(s) => {
item.insert(key.clone(), AttributeValue::S(s.clone()));
}
serde_json::Value::Number(n) => {
if let Some(num) = n.as_f64() {
item.insert(key.clone(), AttributeValue::N(num.to_string()));
}
}
serde_json::Value::Bool(b) => {
item.insert(key.clone(), AttributeValue::Bool(*b));
}
serde_json::Value::Null => {
}
_ => {
item.insert(key.clone(), AttributeValue::S(value.to_string()));
}
}
}
Ok(item)
}
async fn item_to_link(&self, item: &HashMap<String, AttributeValue>) -> Result<LinkEntity> {
let mut json = serde_json::Map::new();
for (key, value) in item {
match value {
AttributeValue::S(s) => {
if key == "metadata" {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(s) {
json.insert(key.clone(), parsed);
} else {
json.insert(key.clone(), serde_json::Value::String(s.clone()));
}
} else {
json.insert(key.clone(), serde_json::Value::String(s.clone()));
}
}
AttributeValue::N(n) => {
if let Ok(num) = n.parse::<f64>() {
json.insert(
key.clone(),
serde_json::Value::Number(serde_json::Number::from_f64(num).unwrap()),
);
}
}
AttributeValue::Bool(b) => {
json.insert(key.clone(), serde_json::Value::Bool(*b));
}
_ => {
}
}
}
Ok(serde_json::from_value(serde_json::Value::Object(json))?)
}
}
#[async_trait]
impl LinkService for DynamoDBLinkService {
async fn create(&self, link: LinkEntity) -> Result<LinkEntity> {
let item = self.link_to_item(&link).await?;
self.client
.put_item()
.table_name(&self.table_name)
.set_item(Some(item))
.send()
.await?;
Ok(link)
}
async fn get(&self, id: &Uuid) -> Result<Option<LinkEntity>> {
let key = HashMap::from([("id".to_string(), AttributeValue::S(id.to_string()))]);
let result = self
.client
.get_item()
.table_name(&self.table_name)
.set_key(Some(key))
.send()
.await?;
match result.item() {
Some(item) => Ok(Some(self.item_to_link(item).await?)),
None => Ok(None),
}
}
async fn list(&self) -> Result<Vec<LinkEntity>> {
let result = self
.client
.scan()
.table_name(&self.table_name)
.send()
.await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
async fn find_by_source(
&self,
source_id: &Uuid,
link_type: Option<&str>,
_target_type: Option<&str>,
) -> Result<Vec<LinkEntity>> {
let mut filter_expr = "source_id = :source_id".to_string();
let mut attr_values = HashMap::new();
attr_values.insert(
":source_id".to_string(),
AttributeValue::S(source_id.to_string()),
);
if let Some(lt) = link_type {
filter_expr.push_str(" AND link_type = :link_type");
attr_values.insert(":link_type".to_string(), AttributeValue::S(lt.to_string()));
}
let mut scan = self
.client
.scan()
.table_name(&self.table_name)
.filter_expression(filter_expr);
for (key, value) in attr_values {
scan = scan.expression_attribute_values(key, value);
}
let result = scan.send().await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
async fn find_by_target(
&self,
target_id: &Uuid,
link_type: Option<&str>,
_source_type: Option<&str>,
) -> Result<Vec<LinkEntity>> {
let mut filter_expr = "target_id = :target_id".to_string();
let mut attr_values = HashMap::new();
attr_values.insert(
":target_id".to_string(),
AttributeValue::S(target_id.to_string()),
);
if let Some(lt) = link_type {
filter_expr.push_str(" AND link_type = :link_type");
attr_values.insert(":link_type".to_string(), AttributeValue::S(lt.to_string()));
}
let mut scan = self
.client
.scan()
.table_name(&self.table_name)
.filter_expression(filter_expr);
for (key, value) in attr_values {
scan = scan.expression_attribute_values(key, value);
}
let result = scan.send().await?;
let mut links = Vec::new();
if let Some(items) = result.items {
for item in items {
links.push(self.item_to_link(&item).await?);
}
}
Ok(links)
}
async fn update(&self, id: &Uuid, updated_link: LinkEntity) -> Result<LinkEntity> {
self.get(id)
.await?
.ok_or_else(|| anyhow::anyhow!("Link not found"))?;
let item = self.link_to_item(&updated_link).await?;
self.client
.put_item()
.table_name(&self.table_name)
.set_item(Some(item))
.send()
.await?;
Ok(updated_link)
}
async fn delete(&self, id: &Uuid) -> Result<()> {
let key = HashMap::from([("id".to_string(), AttributeValue::S(id.to_string()))]);
self.client
.delete_item()
.table_name(&self.table_name)
.set_key(Some(key))
.send()
.await?;
Ok(())
}
async fn delete_by_entity(&self, entity_id: &Uuid) -> Result<()> {
let source_links = self.find_by_source(entity_id, None, None).await?;
let target_links = self.find_by_target(entity_id, None, None).await?;
for link in source_links.into_iter().chain(target_links.into_iter()) {
self.delete(&link.id).await?;
}
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "dynamodb")]
#[allow(dead_code)]
mod tests {
use super::*;
use serde_json::json;
crate::impl_data_entity!(TestItem, "test_item", ["name"], {
quantity: f64,
});
fn test_client() -> DynamoDBClient {
let config = aws_sdk_dynamodb::Config::builder()
.behavior_version(aws_sdk_dynamodb::config::BehaviorVersion::latest())
.region(aws_sdk_dynamodb::config::Region::new("us-east-1"))
.build();
DynamoDBClient::from_conf(config)
}
fn make_data_service() -> DynamoDBDataService<TestItem> {
DynamoDBDataService::new(test_client(), "test_items".to_string())
}
fn make_link_service() -> DynamoDBLinkService {
DynamoDBLinkService::new(test_client(), "test_links".to_string())
}
#[tokio::test]
async fn entity_to_item_includes_id() {
let svc = make_data_service();
let entity = TestItem::new("Widget".to_string(), "active".to_string(), 42.0);
let item = svc.entity_to_item(&entity).await.unwrap();
assert!(item.contains_key("id"));
match &item["id"] {
AttributeValue::S(s) => assert_eq!(s, &entity.id.to_string()),
other => panic!("expected S for id, got {:?}", other),
}
}
#[tokio::test]
async fn entity_to_item_string_field() {
let svc = make_data_service();
let entity = TestItem::new("Widget".to_string(), "active".to_string(), 7.0);
let item = svc.entity_to_item(&entity).await.unwrap();
match &item["name"] {
AttributeValue::S(s) => assert_eq!(s, "Widget"),
other => panic!("expected S for name, got {:?}", other),
}
}
#[tokio::test]
async fn entity_to_item_number_field() {
let svc = make_data_service();
let entity = TestItem::new("Gadget".to_string(), "active".to_string(), 99.0);
let item = svc.entity_to_item(&entity).await.unwrap();
match &item["quantity"] {
AttributeValue::N(n) => assert_eq!(n, "99"),
other => panic!("expected N for quantity, got {:?}", other),
}
}
#[tokio::test]
async fn entity_to_item_skips_null() {
let svc = make_data_service();
let entity = TestItem::new("NoDelete".to_string(), "active".to_string(), 1.0);
let item = svc.entity_to_item(&entity).await.unwrap();
assert!(
!item.contains_key("deleted_at"),
"null field deleted_at should be skipped"
);
}
#[tokio::test]
async fn item_to_entity_string_field() {
let svc = make_data_service();
let id = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let mut item = HashMap::new();
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
item.insert(
"type".to_string(),
AttributeValue::S("test_item".to_string()),
);
item.insert("name".to_string(), AttributeValue::S("Alpha".to_string()));
item.insert(
"status".to_string(),
AttributeValue::S("active".to_string()),
);
item.insert("created_at".to_string(), AttributeValue::S(now.clone()));
item.insert("updated_at".to_string(), AttributeValue::S(now.clone()));
item.insert("quantity".to_string(), AttributeValue::N("10".to_string()));
let entity: TestItem = svc.item_to_entity(&item).await.unwrap();
assert_eq!(entity.name, "Alpha");
assert_eq!(entity.status, "active");
}
#[tokio::test]
async fn item_to_entity_number_field() {
let svc = make_data_service();
let id = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let mut item = HashMap::new();
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
item.insert(
"type".to_string(),
AttributeValue::S("test_item".to_string()),
);
item.insert("name".to_string(), AttributeValue::S("Beta".to_string()));
item.insert(
"status".to_string(),
AttributeValue::S("active".to_string()),
);
item.insert("created_at".to_string(), AttributeValue::S(now.clone()));
item.insert("updated_at".to_string(), AttributeValue::S(now.clone()));
item.insert("quantity".to_string(), AttributeValue::N("55".to_string()));
let entity: TestItem = svc.item_to_entity(&item).await.unwrap();
assert!((entity.quantity - 55.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn item_to_entity_bool_field() {
let svc = make_data_service();
let id = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let mut item = HashMap::new();
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
item.insert(
"type".to_string(),
AttributeValue::S("test_item".to_string()),
);
item.insert("name".to_string(), AttributeValue::S("Gamma".to_string()));
item.insert(
"status".to_string(),
AttributeValue::S("active".to_string()),
);
item.insert("created_at".to_string(), AttributeValue::S(now.clone()));
item.insert("updated_at".to_string(), AttributeValue::S(now.clone()));
item.insert("quantity".to_string(), AttributeValue::N("0".to_string()));
let entity: TestItem = svc.item_to_entity(&item).await.unwrap();
assert_eq!(entity.name, "Gamma");
}
#[tokio::test]
async fn entity_item_roundtrip() {
let svc = make_data_service();
let entity = TestItem::new("Roundtrip".to_string(), "draft".to_string(), 123.0);
let item = svc.entity_to_item(&entity).await.unwrap();
let recovered: TestItem = svc.item_to_entity(&item).await.unwrap();
assert_eq!(recovered.id, entity.id);
assert_eq!(recovered.name, "Roundtrip");
assert_eq!(recovered.status, "draft");
assert!((recovered.quantity - 123.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn link_to_item_basic() {
let svc = make_link_service();
let src = Uuid::new_v4();
let tgt = Uuid::new_v4();
let link = LinkEntity::new("owns", src, tgt, Some(json!({"role": "admin"})));
let item = svc.link_to_item(&link).await.unwrap();
assert!(item.contains_key("id"));
assert!(item.contains_key("source_id"));
assert!(item.contains_key("target_id"));
assert!(item.contains_key("link_type"));
match &item["source_id"] {
AttributeValue::S(s) => assert_eq!(s, &src.to_string()),
other => panic!("expected S for source_id, got {:?}", other),
}
match &item["target_id"] {
AttributeValue::S(s) => assert_eq!(s, &tgt.to_string()),
other => panic!("expected S for target_id, got {:?}", other),
}
}
#[tokio::test]
async fn link_to_item_skips_null_metadata() {
let svc = make_link_service();
let link = LinkEntity::new("owns", Uuid::new_v4(), Uuid::new_v4(), None);
let item = svc.link_to_item(&link).await.unwrap();
assert!(
!item.contains_key("metadata"),
"null metadata should be skipped in link_to_item output"
);
}
#[tokio::test]
async fn item_to_link_basic() {
let svc = make_link_service();
let id = Uuid::new_v4();
let src = Uuid::new_v4();
let tgt = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let mut item = HashMap::new();
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
item.insert("type".to_string(), AttributeValue::S("link".to_string()));
item.insert(
"link_type".to_string(),
AttributeValue::S("owns".to_string()),
);
item.insert("source_id".to_string(), AttributeValue::S(src.to_string()));
item.insert("target_id".to_string(), AttributeValue::S(tgt.to_string()));
item.insert(
"status".to_string(),
AttributeValue::S("active".to_string()),
);
item.insert("created_at".to_string(), AttributeValue::S(now.clone()));
item.insert("updated_at".to_string(), AttributeValue::S(now.clone()));
let link = svc.item_to_link(&item).await.unwrap();
assert_eq!(link.id, id);
assert_eq!(link.source_id, src);
assert_eq!(link.target_id, tgt);
assert_eq!(link.link_type, "owns");
assert_eq!(link.status, "active");
}
#[tokio::test]
async fn item_to_link_metadata_json_parsing() {
let svc = make_link_service();
let id = Uuid::new_v4();
let src = Uuid::new_v4();
let tgt = Uuid::new_v4();
let now = chrono::Utc::now().to_rfc3339();
let meta_json = json!({"role": "admin", "level": 5});
let mut item = HashMap::new();
item.insert("id".to_string(), AttributeValue::S(id.to_string()));
item.insert("type".to_string(), AttributeValue::S("link".to_string()));
item.insert(
"link_type".to_string(),
AttributeValue::S("works_at".to_string()),
);
item.insert("source_id".to_string(), AttributeValue::S(src.to_string()));
item.insert("target_id".to_string(), AttributeValue::S(tgt.to_string()));
item.insert(
"status".to_string(),
AttributeValue::S("active".to_string()),
);
item.insert("created_at".to_string(), AttributeValue::S(now.clone()));
item.insert("updated_at".to_string(), AttributeValue::S(now.clone()));
item.insert(
"metadata".to_string(),
AttributeValue::S(serde_json::to_string(&meta_json).unwrap()),
);
let link = svc.item_to_link(&item).await.unwrap();
let meta = link
.metadata
.expect("metadata should be Some after parsing");
assert_eq!(meta["role"], "admin");
assert_eq!(meta["level"], 5);
}
}