use crate::core::link::LinkEntity;
use crate::core::{Data, DataService, LinkService};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::Serialize;
use serde::de::DeserializeOwned;
use sqlx::{FromRow, PgPool};
use uuid::Uuid;
#[derive(Debug, FromRow)]
struct EntityRow {
id: Uuid,
entity_type: String,
name: String,
status: String,
tenant_id: Option<Uuid>,
data: serde_json::Value,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
deleted_at: Option<DateTime<Utc>>,
}
const ENTITY_COMMON_FIELDS: &[&str] = &[
"id",
"name",
"status",
"tenant_id",
"created_at",
"updated_at",
"deleted_at",
];
const SEARCHABLE_COLUMNS: &[&str] = &["name", "status"];
#[derive(Clone, Debug)]
pub struct PostgresDataService<T> {
pool: PgPool,
_marker: std::marker::PhantomData<T>,
}
impl<T> PostgresDataService<T> {
pub fn new(pool: PgPool) -> Self {
Self {
pool,
_marker: std::marker::PhantomData,
}
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
impl<T: Data + Serialize + DeserializeOwned> PostgresDataService<T> {
fn entity_type_name() -> &'static str {
T::resource_name_singular()
}
fn entity_to_row(entity: &T) -> Result<EntityRow> {
let mut data = serde_json::to_value(entity)
.map_err(|e| anyhow!("Failed to serialize entity: {}", e))?;
if let Some(obj) = data.as_object_mut() {
for field in ENTITY_COMMON_FIELDS {
obj.remove(*field);
}
}
Ok(EntityRow {
id: entity.id(),
entity_type: Self::entity_type_name().to_string(),
name: entity.name().to_string(),
status: entity.status().to_string(),
tenant_id: entity.tenant_id(),
data,
created_at: entity.created_at(),
updated_at: entity.updated_at(),
deleted_at: entity.deleted_at(),
})
}
fn row_to_entity(row: EntityRow) -> Result<T> {
let mut json = if row.data.is_object() {
row.data
} else {
serde_json::json!({})
};
if let Some(obj) = json.as_object_mut() {
obj.insert("id".into(), serde_json::to_value(row.id)?);
if !obj.contains_key("entity_type") {
obj.insert(
"entity_type".into(),
serde_json::to_value(&row.entity_type)?,
);
}
if !obj.contains_key("type") {
obj.insert("type".into(), serde_json::to_value(&row.entity_type)?);
}
obj.insert("name".into(), serde_json::to_value(&row.name)?);
obj.insert("status".into(), serde_json::to_value(&row.status)?);
obj.insert("created_at".into(), serde_json::to_value(row.created_at)?);
obj.insert("updated_at".into(), serde_json::to_value(row.updated_at)?);
obj.insert("deleted_at".into(), serde_json::to_value(row.deleted_at)?);
if let Some(tid) = row.tenant_id {
obj.insert("tenant_id".into(), serde_json::to_value(tid)?);
}
}
serde_json::from_value::<T>(json)
.map_err(|e| anyhow!("Failed to deserialize entity from row: {}", e))
}
}
#[async_trait]
impl<T: Data + Serialize + DeserializeOwned> DataService<T> for PostgresDataService<T> {
async fn create(&self, entity: T) -> Result<T> {
let row = Self::entity_to_row(&entity)?;
let result = sqlx::query_as::<_, EntityRow>(
"INSERT INTO entities (id, entity_type, name, status, tenant_id, data, created_at, updated_at, deleted_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
RETURNING *",
)
.bind(row.id)
.bind(&row.entity_type)
.bind(&row.name)
.bind(&row.status)
.bind(row.tenant_id)
.bind(&row.data)
.bind(row.created_at)
.bind(row.updated_at)
.bind(row.deleted_at)
.fetch_one(&self.pool)
.await
.map_err(|e| anyhow!("Failed to create entity: {}", e))?;
Self::row_to_entity(result)
}
async fn get(&self, id: &Uuid) -> Result<Option<T>> {
let row = sqlx::query_as::<_, EntityRow>(
"SELECT * FROM entities WHERE id = $1 AND entity_type = $2",
)
.bind(id)
.bind(Self::entity_type_name())
.fetch_optional(&self.pool)
.await
.map_err(|e| anyhow!("Failed to get entity: {}", e))?;
match row {
Some(r) => Ok(Some(Self::row_to_entity(r)?)),
None => Ok(None),
}
}
async fn list(&self) -> Result<Vec<T>> {
let rows = sqlx::query_as::<_, EntityRow>(
"SELECT * FROM entities WHERE entity_type = $1 ORDER BY created_at DESC",
)
.bind(Self::entity_type_name())
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to list entities: {}", e))?;
rows.into_iter().map(Self::row_to_entity).collect()
}
async fn update(&self, id: &Uuid, entity: T) -> Result<T> {
let row = Self::entity_to_row(&entity)?;
let result = sqlx::query_as::<_, EntityRow>(
"UPDATE entities \
SET name = $1, status = $2, tenant_id = $3, data = $4, updated_at = $5, deleted_at = $6 \
WHERE id = $7 AND entity_type = $8 \
RETURNING *",
)
.bind(&row.name)
.bind(&row.status)
.bind(row.tenant_id)
.bind(&row.data)
.bind(row.updated_at)
.bind(row.deleted_at)
.bind(id)
.bind(Self::entity_type_name())
.fetch_optional(&self.pool)
.await
.map_err(|e| anyhow!("Failed to update entity: {}", e))?;
match result {
Some(r) => Self::row_to_entity(r),
None => Err(anyhow!("Entity not found: {}", id)),
}
}
async fn delete(&self, id: &Uuid) -> Result<()> {
sqlx::query("DELETE FROM entities WHERE id = $1 AND entity_type = $2")
.bind(id)
.bind(Self::entity_type_name())
.execute(&self.pool)
.await
.map_err(|e| anyhow!("Failed to delete entity: {}", e))?;
Ok(())
}
async fn search(&self, field: &str, value: &str) -> Result<Vec<T>> {
let rows = if SEARCHABLE_COLUMNS.contains(&field) {
let sql = format!(
"SELECT * FROM entities WHERE entity_type = $1 AND {} = $2",
field
);
sqlx::query_as::<_, EntityRow>(&sql)
.bind(Self::entity_type_name())
.bind(value)
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to search entities: {}", e))?
} else {
sqlx::query_as::<_, EntityRow>(
"SELECT * FROM entities WHERE entity_type = $1 AND data->>$2 = $3",
)
.bind(Self::entity_type_name())
.bind(field)
.bind(value)
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to search entities by JSONB field: {}", e))?
};
rows.into_iter().map(Self::row_to_entity).collect()
}
}
#[derive(Debug, FromRow)]
struct LinkRow {
id: Uuid,
entity_type: String,
link_type: String,
source_id: Uuid,
target_id: Uuid,
source_type: Option<String>,
target_type: Option<String>,
status: String,
tenant_id: Option<Uuid>,
metadata: serde_json::Value,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
deleted_at: Option<DateTime<Utc>>,
}
impl LinkRow {
fn from_link(link: &LinkEntity) -> Self {
Self {
id: link.id,
entity_type: link.entity_type.clone(),
link_type: link.link_type.clone(),
source_id: link.source_id,
target_id: link.target_id,
source_type: None, target_type: None, status: link.status.clone(),
tenant_id: link.tenant_id,
metadata: link.metadata.clone().unwrap_or(serde_json::json!({})),
created_at: link.created_at,
updated_at: link.updated_at,
deleted_at: link.deleted_at,
}
}
fn into_link(self) -> LinkEntity {
LinkEntity {
id: self.id,
entity_type: self.entity_type,
created_at: self.created_at,
updated_at: self.updated_at,
deleted_at: self.deleted_at,
status: self.status,
tenant_id: self.tenant_id,
link_type: self.link_type,
source_id: self.source_id,
target_id: self.target_id,
metadata: if self.metadata == serde_json::json!({}) {
None
} else {
Some(self.metadata)
},
}
}
}
#[derive(Clone, Debug)]
pub struct PostgresLinkService {
pool: PgPool,
}
impl PostgresLinkService {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
#[async_trait]
impl LinkService for PostgresLinkService {
async fn create(&self, link: LinkEntity) -> Result<LinkEntity> {
let row = LinkRow::from_link(&link);
let result = sqlx::query_as::<_, LinkRow>(
"INSERT INTO links (id, entity_type, link_type, source_id, target_id, source_type, target_type, status, tenant_id, metadata, created_at, updated_at, deleted_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \
RETURNING *",
)
.bind(row.id)
.bind(&row.entity_type)
.bind(&row.link_type)
.bind(row.source_id)
.bind(row.target_id)
.bind(&row.source_type)
.bind(&row.target_type)
.bind(&row.status)
.bind(row.tenant_id)
.bind(&row.metadata)
.bind(row.created_at)
.bind(row.updated_at)
.bind(row.deleted_at)
.fetch_one(&self.pool)
.await
.map_err(|e| anyhow!("Failed to create link: {}", e))?;
Ok(result.into_link())
}
async fn get(&self, id: &Uuid) -> Result<Option<LinkEntity>> {
let row = sqlx::query_as::<_, LinkRow>("SELECT * FROM links WHERE id = $1")
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| anyhow!("Failed to get link: {}", e))?;
Ok(row.map(LinkRow::into_link))
}
async fn list(&self) -> Result<Vec<LinkEntity>> {
let rows = sqlx::query_as::<_, LinkRow>("SELECT * FROM links ORDER BY created_at DESC")
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to list links: {}", e))?;
Ok(rows.into_iter().map(LinkRow::into_link).collect())
}
async fn find_by_source(
&self,
source_id: &Uuid,
link_type: Option<&str>,
_target_type: Option<&str>,
) -> Result<Vec<LinkEntity>> {
let mut sql = String::from("SELECT * FROM links WHERE source_id = $1");
if link_type.is_some() {
sql.push_str(" AND link_type = $2");
}
sql.push_str(" ORDER BY created_at DESC");
let mut query = sqlx::query_as::<_, LinkRow>(&sql).bind(source_id);
if let Some(lt) = link_type {
query = query.bind(lt);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to find links by source: {}", e))?;
Ok(rows.into_iter().map(LinkRow::into_link).collect())
}
async fn find_by_target(
&self,
target_id: &Uuid,
link_type: Option<&str>,
_source_type: Option<&str>,
) -> Result<Vec<LinkEntity>> {
let mut sql = String::from("SELECT * FROM links WHERE target_id = $1");
if link_type.is_some() {
sql.push_str(" AND link_type = $2");
}
sql.push_str(" ORDER BY created_at DESC");
let mut query = sqlx::query_as::<_, LinkRow>(&sql).bind(target_id);
if let Some(lt) = link_type {
query = query.bind(lt);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| anyhow!("Failed to find links by target: {}", e))?;
Ok(rows.into_iter().map(LinkRow::into_link).collect())
}
async fn update(&self, id: &Uuid, link: LinkEntity) -> Result<LinkEntity> {
let row = LinkRow::from_link(&link);
let result = sqlx::query_as::<_, LinkRow>(
"UPDATE links \
SET link_type = $1, source_id = $2, target_id = $3, status = $4, \
tenant_id = $5, metadata = $6, updated_at = $7, deleted_at = $8 \
WHERE id = $9 \
RETURNING *",
)
.bind(&row.link_type)
.bind(row.source_id)
.bind(row.target_id)
.bind(&row.status)
.bind(row.tenant_id)
.bind(&row.metadata)
.bind(row.updated_at)
.bind(row.deleted_at)
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| anyhow!("Failed to update link: {}", e))?;
match result {
Some(r) => Ok(r.into_link()),
None => Err(anyhow!("Link not found: {}", id)),
}
}
async fn delete(&self, id: &Uuid) -> Result<()> {
sqlx::query("DELETE FROM links WHERE id = $1")
.bind(id)
.execute(&self.pool)
.await
.map_err(|e| anyhow!("Failed to delete link: {}", e))?;
Ok(())
}
async fn delete_by_entity(&self, entity_id: &Uuid) -> Result<()> {
sqlx::query("DELETE FROM links WHERE source_id = $1 OR target_id = $1")
.bind(entity_id)
.execute(&self.pool)
.await
.map_err(|e| anyhow!("Failed to delete links by entity: {}", e))?;
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "postgres")]
#[allow(dead_code)]
mod tests {
use super::*;
use serde_json::json;
crate::impl_data_entity!(TestOrder, "test_order", ["name"], {
amount: f64,
});
#[test]
fn entity_to_row_strips_common_fields() {
let order = TestOrder::new("Widget".into(), "active".into(), 42.5);
let row = PostgresDataService::<TestOrder>::entity_to_row(&order).unwrap();
let obj = row.data.as_object().expect("data should be a JSON object");
for field in ENTITY_COMMON_FIELDS {
assert!(
!obj.contains_key(*field),
"data should not contain common field '{field}'"
);
}
assert_eq!(obj.get("amount").and_then(|v| v.as_f64()), Some(42.5));
}
#[test]
fn entity_to_row_preserves_entity_type() {
let order = TestOrder::new("Gadget".into(), "active".into(), 10.0);
let row = PostgresDataService::<TestOrder>::entity_to_row(&order).unwrap();
assert_eq!(row.entity_type, "test_order");
}
#[test]
fn row_to_entity_roundtrip() {
let order = TestOrder::new("Roundtrip".into(), "pending".into(), 99.99);
let original_id = order.id;
let original_created = order.created_at;
let original_updated = order.updated_at;
let row = PostgresDataService::<TestOrder>::entity_to_row(&order).unwrap();
let restored = PostgresDataService::<TestOrder>::row_to_entity(row).unwrap();
assert_eq!(restored.id, original_id);
assert_eq!(restored.name, "Roundtrip");
assert_eq!(restored.status, "pending");
assert_eq!(restored.amount, 99.99);
assert_eq!(restored.created_at, original_created);
assert_eq!(restored.updated_at, original_updated);
assert!(restored.deleted_at.is_none());
}
#[test]
fn row_to_entity_non_object_data_handled() {
let now = Utc::now();
let id = Uuid::new_v4();
let row_with_amount = EntityRow {
id,
entity_type: "test_order".into(),
name: "NullData".into(),
status: "active".into(),
tenant_id: None,
data: json!({ "amount": 7.5 }),
created_at: now,
updated_at: now,
deleted_at: None,
};
let entity = PostgresDataService::<TestOrder>::row_to_entity(row_with_amount).unwrap();
assert_eq!(entity.id, id);
assert_eq!(entity.name, "NullData");
let row_null = EntityRow {
id,
entity_type: "test_order".into(),
name: "NullData".into(),
status: "active".into(),
tenant_id: None,
data: json!(null),
created_at: now,
updated_at: now,
deleted_at: None,
};
let err = PostgresDataService::<TestOrder>::row_to_entity(row_null).unwrap_err();
assert!(
err.to_string().contains("deserialize"),
"error should mention deserialization: {}",
err
);
}
#[test]
fn row_to_entity_entity_type_fallback() {
let now = Utc::now();
let id = Uuid::new_v4();
let row = EntityRow {
id,
entity_type: "test_order".into(),
name: "Fallback".into(),
status: "active".into(),
tenant_id: None,
data: json!({ "amount": 1.0 }), created_at: now,
updated_at: now,
deleted_at: None,
};
let entity = PostgresDataService::<TestOrder>::row_to_entity(row).unwrap();
assert_eq!(entity.entity_type, "test_order");
}
fn make_link() -> LinkEntity {
let now = Utc::now();
LinkEntity {
id: Uuid::new_v4(),
entity_type: "ownership".into(),
created_at: now,
updated_at: now,
deleted_at: None,
status: "active".into(),
tenant_id: Some(Uuid::new_v4()),
link_type: "owns".into(),
source_id: Uuid::new_v4(),
target_id: Uuid::new_v4(),
metadata: Some(json!({"priority": "high"})),
}
}
#[test]
fn link_row_from_link_preserves_fields() {
let link = make_link();
let row = LinkRow::from_link(&link);
assert_eq!(row.id, link.id);
assert_eq!(row.entity_type, link.entity_type);
assert_eq!(row.link_type, link.link_type);
assert_eq!(row.source_id, link.source_id);
assert_eq!(row.target_id, link.target_id);
assert_eq!(row.status, link.status);
assert_eq!(row.tenant_id, link.tenant_id);
assert_eq!(row.created_at, link.created_at);
assert_eq!(row.updated_at, link.updated_at);
assert_eq!(row.deleted_at, link.deleted_at);
assert_eq!(row.metadata, json!({"priority": "high"}));
assert!(row.source_type.is_none());
assert!(row.target_type.is_none());
}
#[test]
fn link_row_into_link_roundtrip() {
let original = make_link();
let row = LinkRow::from_link(&original);
let restored = row.into_link();
assert_eq!(restored.id, original.id);
assert_eq!(restored.entity_type, original.entity_type);
assert_eq!(restored.link_type, original.link_type);
assert_eq!(restored.source_id, original.source_id);
assert_eq!(restored.target_id, original.target_id);
assert_eq!(restored.status, original.status);
assert_eq!(restored.tenant_id, original.tenant_id);
assert_eq!(restored.created_at, original.created_at);
assert_eq!(restored.updated_at, original.updated_at);
assert_eq!(restored.deleted_at, original.deleted_at);
assert_eq!(restored.metadata, original.metadata);
}
#[test]
fn link_row_into_link_empty_metadata_becomes_none() {
let mut link = make_link();
link.metadata = None;
let row = LinkRow::from_link(&link);
assert_eq!(
row.metadata,
json!({}),
"None metadata stored as empty object"
);
let restored = row.into_link();
assert_eq!(restored.metadata, None, "empty object should become None");
}
#[test]
fn link_row_into_link_with_metadata() {
let mut link = make_link();
link.metadata = Some(json!({"key": "val"}));
let row = LinkRow::from_link(&link);
let restored = row.into_link();
assert_eq!(
restored.metadata,
Some(json!({"key": "val"})),
"non-empty metadata should survive roundtrip"
);
}
}