use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use deadpool_postgres::Client;
use helios_fhir::FhirVersion;
use serde_json::Value;
use crate::core::{Transaction, TransactionOptions, TransactionProvider};
use crate::error::{
BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
};
use crate::search::SearchParameterExtractor;
use crate::tenant::TenantContext;
use crate::types::StoredResource;
use super::PostgresBackend;
use super::search::writer::PostgresSearchIndexWriter;
fn internal_error(message: String) -> StorageError {
StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
message,
source: None,
})
}
#[allow(dead_code)]
fn serialization_error(message: String) -> StorageError {
StorageError::Backend(BackendError::SerializationError { message })
}
pub struct PostgresTransaction {
client: Option<Client>,
active: bool,
tenant: TenantContext,
search_extractor: Arc<SearchParameterExtractor>,
search_offloaded: bool,
}
impl std::fmt::Debug for PostgresTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresTransaction")
.field("active", &self.active)
.field("tenant", &self.tenant)
.finish()
}
}
impl PostgresTransaction {
async fn new(
client: Client,
tenant: TenantContext,
search_extractor: Arc<SearchParameterExtractor>,
search_offloaded: bool,
) -> StorageResult<Self> {
client.execute("BEGIN", &[]).await.map_err(|e| {
StorageError::Transaction(TransactionError::RolledBack {
reason: format!("Failed to begin transaction: {}", e),
})
})?;
Ok(Self {
client: Some(client),
active: true,
tenant,
search_extractor,
search_offloaded,
})
}
fn client(&self) -> StorageResult<&Client> {
self.client
.as_ref()
.ok_or_else(|| StorageError::Transaction(TransactionError::InvalidTransaction))
}
async fn index_resource(
&self,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<()> {
if self.search_offloaded {
return Ok(());
}
let client = self.client()?;
client
.execute(
"DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
&[&tenant_id, &resource_type, &resource_id],
)
.await
.map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
let values = self
.search_extractor
.extract(resource, resource_type)
.map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
for value in values {
PostgresSearchIndexWriter::write_entry(
client,
tenant_id,
resource_type,
resource_id,
&value,
)
.await?;
}
tracing::debug!(
"Indexed resource {}/{} within transaction",
resource_type,
resource_id
);
Ok(())
}
}
#[async_trait]
impl Transaction for PostgresTransaction {
async fn create(
&mut self,
resource_type: &str,
resource: Value,
) -> StorageResult<StoredResource> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
let client = self.client()?;
let tenant_id = self.tenant.tenant_id().as_str();
let id = resource
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let exists = client
.query_opt(
"SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
&[&tenant_id, &resource_type, &id],
)
.await
.map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
if exists.is_some() {
return Err(StorageError::Resource(ResourceError::AlreadyExists {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
let mut data = resource.clone();
if let Some(obj) = data.as_object_mut() {
obj.insert("id".to_string(), Value::String(id.clone()));
obj.insert(
"resourceType".to_string(),
Value::String(resource_type.to_string()),
);
}
let now = Utc::now();
let version_id = "1";
let fhir_version = FhirVersion::default();
let fhir_version_str = fhir_version.as_mime_param();
let is_deleted = false;
client
.execute(
"INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
)
.await
.map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
client
.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
)
.await
.map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
self.index_resource(tenant_id, resource_type, &id, &data)
.await?;
Ok(StoredResource::from_storage(
resource_type,
&id,
version_id,
self.tenant.tenant_id().clone(),
data,
now,
now,
None,
fhir_version,
))
}
async fn read(
&mut self,
resource_type: &str,
id: &str,
) -> StorageResult<Option<StoredResource>> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
let client = self.client()?;
let tenant_id = self.tenant.tenant_id().as_str();
let row = client
.query_opt(
"SELECT version_id, data, last_updated, is_deleted, fhir_version
FROM resources
WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
&[&tenant_id, &resource_type, &id],
)
.await
.map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
match row {
Some(row) => {
let version_id: String = row.get(0);
let data: serde_json::Value = row.get(1);
let last_updated: chrono::DateTime<Utc> = row.get(2);
let is_deleted: bool = row.get(3);
let fhir_version_str: String = row.get(4);
if is_deleted {
return Ok(None);
}
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
Ok(Some(StoredResource::from_storage(
resource_type,
id,
version_id,
self.tenant.tenant_id().clone(),
data,
last_updated,
last_updated,
None,
fhir_version,
)))
}
None => Ok(None),
}
}
async fn update(
&mut self,
current: &StoredResource,
resource: Value,
) -> StorageResult<StoredResource> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
let client = self.client()?;
let tenant_id = self.tenant.tenant_id().as_str();
let resource_type = current.resource_type();
let id = current.id();
let row = client
.query_opt(
"SELECT version_id FROM resources
WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
&[&tenant_id, &resource_type, &id],
)
.await
.map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
let db_version = match row {
Some(row) => row.get::<_, String>(0),
None => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
};
if db_version != current.version_id() {
return Err(StorageError::Concurrency(
ConcurrencyError::VersionConflict {
resource_type: resource_type.to_string(),
id: id.to_string(),
expected_version: current.version_id().to_string(),
actual_version: db_version,
},
));
}
let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
let new_version_str = new_version.to_string();
let mut data = resource.clone();
if let Some(obj) = data.as_object_mut() {
obj.insert("id".to_string(), Value::String(id.to_string()));
obj.insert(
"resourceType".to_string(),
Value::String(resource_type.to_string()),
);
}
let now = Utc::now();
let fhir_version = current.fhir_version();
let fhir_version_str = fhir_version.as_mime_param();
let is_deleted = false;
client
.execute(
"UPDATE resources SET version_id = $1, data = $2, last_updated = $3
WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
&[
&new_version_str,
&data,
&now,
&tenant_id,
&resource_type,
&id,
],
)
.await
.map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
client
.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
)
.await
.map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
self.index_resource(tenant_id, resource_type, id, &data)
.await?;
Ok(StoredResource::from_storage(
resource_type,
id,
new_version_str,
self.tenant.tenant_id().clone(),
data,
now,
now,
None,
fhir_version,
))
}
async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
let client = self.client()?;
let tenant_id = self.tenant.tenant_id().as_str();
let row = client
.query_opt(
"SELECT version_id, data, fhir_version FROM resources
WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
&[&tenant_id, &resource_type, &id],
)
.await
.map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
let (current_version, data, fhir_version_str) = match row {
Some(row) => {
let v: String = row.get(0);
let d: serde_json::Value = row.get(1);
let f: String = row.get(2);
(v, d, f)
}
None => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
};
let now = Utc::now();
let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
let new_version_str = new_version.to_string();
let is_deleted = true;
client
.execute(
"UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
&[&now, &new_version_str, &tenant_id, &resource_type, &id],
)
.await
.map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
client
.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
)
.await
.map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
Ok(())
}
async fn commit(mut self: Box<Self>) -> StorageResult<()> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
if let Some(client) = self.client.as_ref() {
client.execute("COMMIT", &[]).await.map_err(|e| {
StorageError::Transaction(TransactionError::RolledBack {
reason: format!("Commit failed: {}", e),
})
})?;
}
self.active = false;
Ok(())
}
async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
if !self.active {
return Err(StorageError::Transaction(
TransactionError::InvalidTransaction,
));
}
if let Some(client) = self.client.as_ref() {
client.execute("ROLLBACK", &[]).await.map_err(|e| {
StorageError::Transaction(TransactionError::RolledBack {
reason: format!("Rollback failed: {}", e),
})
})?;
}
self.active = false;
Ok(())
}
fn tenant(&self) -> &TenantContext {
&self.tenant
}
fn is_active(&self) -> bool {
self.active
}
}
impl Drop for PostgresTransaction {
fn drop(&mut self) {
if self.active {
tracing::warn!("PostgreSQL transaction dropped without explicit commit or rollback");
}
}
}
#[async_trait]
impl TransactionProvider for PostgresBackend {
type Transaction = PostgresTransaction;
async fn begin_transaction(
&self,
tenant: &TenantContext,
_options: TransactionOptions,
) -> StorageResult<Self::Transaction> {
let client = self.get_client().await?;
PostgresTransaction::new(
client,
tenant.clone(),
self.search_extractor().clone(),
self.is_search_offloaded(),
)
.await
}
}