use async_trait::async_trait;
use chrono::Utc;
use helios_fhir::FhirVersion;
use rusqlite::{ToSql, params};
use serde_json::Value;
use crate::core::history::{
DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
};
use crate::core::transaction::{
BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
};
use crate::core::{
ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
};
use crate::error::TransactionError;
use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
use crate::search::extractor::ExtractedValue;
use crate::search::loader::SearchParameterLoader;
use crate::search::registry::SearchParameterStatus;
use crate::search::reindex::{ReindexableStorage, ResourcePage};
use crate::tenant::TenantContext;
use crate::types::Pagination;
use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
use super::SqliteBackend;
use super::search::writer::SqliteSearchIndexWriter;
fn internal_error(message: String) -> StorageError {
StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message,
source: None,
})
}
fn serialization_error(message: String) -> StorageError {
StorageError::Backend(BackendError::SerializationError { message })
}
#[async_trait]
impl ResourceStorage for SqliteBackend {
fn backend_name(&self) -> &'static str {
"sqlite"
}
async fn create(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
fhir_version: FhirVersion,
) -> StorageResult<StoredResource> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let id = resource
.get("id")
.and_then(|v| v.as_str())
.map(String::from)
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let exists: bool = conn
.query_row(
"SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|_| Ok(true),
)
.unwrap_or(false);
if exists {
return Err(StorageError::Resource(ResourceError::AlreadyExists {
resource_type: resource_type.to_string(),
id: id.clone(),
}));
}
let mut resource = resource;
if let Some(obj) = resource.as_object_mut() {
obj.insert(
"resourceType".to_string(),
Value::String(resource_type.to_string()),
);
obj.insert("id".to_string(), Value::String(id.clone()));
}
let data = serde_json::to_vec(&resource)
.map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
let now = Utc::now();
let last_updated = now.to_rfc3339();
let version_id = "1";
let fhir_version_str = fhir_version.as_mime_param();
conn.execute(
"INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
)
.map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
conn.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
)
.map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
self.index_resource(&conn, tenant_id, resource_type, &id, &resource)?;
if resource_type == "SearchParameter" {
self.handle_search_parameter_create(&resource)?;
}
Ok(StoredResource::from_storage(
resource_type,
&id,
version_id,
tenant.tenant_id().clone(),
resource,
now,
now,
None,
fhir_version,
))
}
async fn create_or_update(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
resource: Value,
fhir_version: FhirVersion,
) -> StorageResult<(StoredResource, bool)> {
let existing = self.read(tenant, resource_type, id).await?;
if let Some(current) = existing {
let updated = self.update(tenant, ¤t, resource).await?;
Ok((updated, false))
} else {
let mut resource = resource;
if let Some(obj) = resource.as_object_mut() {
obj.insert("id".to_string(), Value::String(id.to_string()));
}
let created = self
.create(tenant, resource_type, resource, fhir_version)
.await?;
Ok((created, true))
}
}
async fn read(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<Option<StoredResource>> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let result = conn.query_row(
"SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|row| {
let version_id: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let last_updated: String = row.get(2)?;
let is_deleted: i32 = row.get(3)?;
let deleted_at: Option<String> = row.get(4)?;
let fhir_version: String = row.get(5)?;
Ok((
version_id,
data,
last_updated,
is_deleted,
deleted_at,
fhir_version,
))
},
);
match result {
Ok((version_id, data, last_updated, is_deleted, deleted_at, fhir_version_str)) => {
if is_deleted != 0 {
let deleted_at = deleted_at.and_then(|s| {
chrono::DateTime::parse_from_rfc3339(&s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
return Err(StorageError::Resource(ResourceError::Gone {
resource_type: resource_type.to_string(),
id: id.to_string(),
deleted_at,
}));
}
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
Ok(Some(StoredResource::from_storage(
resource_type,
id,
version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
None,
fhir_version,
)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
}
}
async fn update(
&self,
tenant: &TenantContext,
current: &StoredResource,
resource: Value,
) -> StorageResult<StoredResource> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let resource_type = current.resource_type();
let id = current.id();
let actual_version: Result<String, _> = conn.query_row(
"SELECT version_id FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
params![tenant_id, resource_type, id],
|row| row.get(0),
);
let actual_version = match actual_version {
Ok(v) => v,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
Err(e) => {
return Err(internal_error(format!(
"Failed to get current version: {}",
e
)));
}
};
if actual_version != current.version_id() {
return Err(StorageError::Concurrency(
ConcurrencyError::VersionConflict {
resource_type: resource_type.to_string(),
id: id.to_string(),
expected_version: current.version_id().to_string(),
actual_version,
},
));
}
let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
let new_version_str = new_version.to_string();
let mut resource = resource;
if let Some(obj) = resource.as_object_mut() {
obj.insert(
"resourceType".to_string(),
Value::String(resource_type.to_string()),
);
obj.insert("id".to_string(), Value::String(id.to_string()));
}
let data = serde_json::to_vec(&resource)
.map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
let now = Utc::now();
let last_updated = now.to_rfc3339();
conn.execute(
"UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
params![
new_version_str,
data,
last_updated,
tenant_id,
resource_type,
id
],
)
.map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
let fhir_version_str = current.fhir_version().as_mime_param();
conn.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
params![tenant_id, resource_type, id, new_version_str, data, last_updated, fhir_version_str],
)
.map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
self.delete_search_index(&conn, tenant_id, resource_type, id)?;
self.index_resource(&conn, tenant_id, resource_type, id, &resource)?;
if resource_type == "SearchParameter" {
self.handle_search_parameter_update(current.content(), &resource)?;
}
Ok(StoredResource::from_storage(
resource_type,
id,
new_version_str,
tenant.tenant_id().clone(),
resource,
now,
now,
None,
current.fhir_version(),
))
}
async fn delete(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<()> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
"SELECT version_id, data, fhir_version FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
params![tenant_id, resource_type, id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
);
let (current_version, data, fhir_version_str) = match result {
Ok(v) => v,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
Err(e) => {
return Err(internal_error(format!("Failed to check resource: {}", e)));
}
};
let now = Utc::now();
let deleted_at = now.to_rfc3339();
let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
let new_version_str = new_version.to_string();
conn.execute(
"UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
params![deleted_at, new_version_str, tenant_id, resource_type, id],
)
.map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
conn.execute(
"INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
)
.map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
if !self.is_search_offloaded() {
conn.execute(
"DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
params![tenant_id, resource_type, id],
)
.map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
}
if resource_type == "SearchParameter" {
if let Ok(resource_json) = serde_json::from_slice::<Value>(&data) {
self.handle_search_parameter_delete(&resource_json)?;
}
}
Ok(())
}
async fn count(
&self,
tenant: &TenantContext,
resource_type: Option<&str>,
) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let count: i64 = if let Some(rt) = resource_type {
conn.query_row(
"SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0",
params![tenant_id, rt],
|row| row.get(0),
)
} else {
conn.query_row(
"SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
params![tenant_id],
|row| row.get(0),
)
}
.map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
Ok(count as u64)
}
}
impl SqliteBackend {
pub(crate) fn index_resource(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<()> {
if self.is_search_offloaded() {
return Ok(());
}
match self.index_resource_dynamic(conn, tenant_id, resource_type, resource_id, resource) {
Ok(count) => {
tracing::debug!(
"Dynamically indexed {} values for {}/{}",
count,
resource_type,
resource_id
);
}
Err(e) => {
tracing::warn!(
"Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
resource_type,
resource_id,
e
);
self.index_minimal_fallback(conn, tenant_id, resource_type, resource_id, resource)?;
}
}
self.index_fts_content(conn, tenant_id, resource_type, resource_id, resource)?;
Ok(())
}
fn index_fts_content(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<()> {
use super::search::fts::extract_searchable_content;
let fts_exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='resource_fts'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !fts_exists {
return Ok(());
}
let content = extract_searchable_content(resource);
if content.is_empty() {
return Ok(());
}
conn.execute(
"INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
resource_id,
resource_type,
tenant_id,
content.narrative,
content.full_content
],
)
.map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
Ok(())
}
fn index_resource_dynamic(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<usize> {
let values = self
.search_extractor()
.extract(resource, resource_type)
.map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
let mut count = 0;
for value in values {
self.write_index_entry(conn, tenant_id, resource_type, resource_id, &value)?;
count += 1;
}
Ok(count)
}
fn write_index_entry(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
value: &ExtractedValue,
) -> StorageResult<()> {
use crate::search::converters::IndexValue;
let normalized_value = match &value.value {
IndexValue::Date {
value: date_str,
precision,
} => {
let normalized_date = Self::normalize_date_for_sqlite(date_str);
let mut normalized = value.clone();
normalized.value = IndexValue::Date {
value: normalized_date,
precision: *precision,
};
Some(normalized)
}
_ => None,
};
let value_to_use = normalized_value.as_ref().unwrap_or(value);
let sql_params = SqliteSearchIndexWriter::to_sql_params(
tenant_id,
resource_type,
resource_id,
value_to_use,
);
let param_refs: Vec<&dyn ToSql> = sql_params
.iter()
.map(|p| self.sql_value_to_ref(p))
.collect();
conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
.map_err(|e| internal_error(format!("Failed to insert search index entry: {}", e)))?;
Ok(())
}
fn normalize_date_for_sqlite(value: &str) -> String {
if value.contains('T') {
value.to_string()
} else if value.len() == 10 {
format!("{}T00:00:00", value)
} else if value.len() == 7 {
format!("{}-01T00:00:00", value)
} else if value.len() == 4 {
format!("{}-01-01T00:00:00", value)
} else {
value.to_string()
}
}
fn sql_value_to_ref<'a>(&'a self, value: &'a super::search::writer::SqlValue) -> &'a dyn ToSql {
use super::search::writer::SqlValue;
match value {
SqlValue::String(s) => s,
SqlValue::OptString(opt) => opt,
SqlValue::Int(i) => i,
SqlValue::OptInt(opt) => opt,
SqlValue::Float(f) => f,
SqlValue::Null => &rusqlite::types::Null,
}
}
pub(crate) fn delete_search_index(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
) -> StorageResult<()> {
if self.is_search_offloaded() {
return Ok(());
}
conn.execute(
"DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
params![tenant_id, resource_type, resource_id],
)
.map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
let _ = conn.execute(
"DELETE FROM resource_fts WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
params![tenant_id, resource_type, resource_id],
);
Ok(())
}
fn index_minimal_fallback(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<()> {
if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
self.insert_token_index(conn, tenant_id, resource_type, resource_id, "_id", None, id)?;
}
if let Some(last_updated) = resource
.get("meta")
.and_then(|m| m.get("lastUpdated"))
.and_then(|v| v.as_str())
{
self.insert_date_index(
conn,
tenant_id,
resource_type,
resource_id,
"_lastUpdated",
last_updated,
)?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn insert_token_index(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
param_name: &str,
system: Option<&str>,
code: &str,
) -> StorageResult<()> {
conn.execute(
"INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_system, value_token_code)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![tenant_id, resource_type, resource_id, param_name, system, code],
)
.map_err(|e| internal_error(format!("Failed to insert token index: {}", e)))?;
Ok(())
}
fn insert_date_index(
&self,
conn: &rusqlite::Connection,
tenant_id: &str,
resource_type: &str,
resource_id: &str,
param_name: &str,
value: &str,
) -> StorageResult<()> {
let normalized = if value.contains('T') {
value.to_string()
} else if value.len() == 10 {
format!("{}T00:00:00", value)
} else if value.len() == 7 {
format!("{}-01T00:00:00", value)
} else if value.len() == 4 {
format!("{}-01-01T00:00:00", value)
} else {
value.to_string()
};
conn.execute(
"INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![tenant_id, resource_type, resource_id, param_name, normalized],
)
.map_err(|e| internal_error(format!("Failed to insert date index: {}", e)))?;
Ok(())
}
}
impl SqliteBackend {
fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
let loader = SearchParameterLoader::new(FhirVersion::R4);
match loader.parse_resource(resource) {
Ok(def) => {
if def.status == SearchParameterStatus::Active {
let mut registry = self.search_registry().write();
if let Err(e) = registry.register(def) {
tracing::debug!("SearchParameter registration skipped: {}", e);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
}
}
Ok(())
}
fn handle_search_parameter_update(
&self,
old_resource: &Value,
new_resource: &Value,
) -> StorageResult<()> {
let loader = SearchParameterLoader::new(FhirVersion::R4);
let old_def = loader.parse_resource(old_resource).ok();
let new_def = loader.parse_resource(new_resource).ok();
match (old_def, new_def) {
(Some(old), Some(new)) => {
let mut registry = self.search_registry().write();
if old.url != new.url {
let _ = registry.unregister(&old.url);
if new.status == SearchParameterStatus::Active {
let _ = registry.register(new);
}
} else if old.status != new.status {
if let Err(e) = registry.update_status(&new.url, new.status) {
tracing::debug!("SearchParameter status update skipped: {}", e);
}
} else {
let _ = registry.unregister(&old.url);
if new.status == SearchParameterStatus::Active {
let _ = registry.register(new);
}
}
}
(None, Some(new)) => {
if new.status == SearchParameterStatus::Active {
let mut registry = self.search_registry().write();
let _ = registry.register(new);
}
}
(Some(old), None) => {
let mut registry = self.search_registry().write();
let _ = registry.unregister(&old.url);
}
(None, None) => {
}
}
Ok(())
}
fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
let mut registry = self.search_registry().write();
if let Err(e) = registry.unregister(url) {
tracing::debug!("SearchParameter unregistration skipped: {}", e);
}
}
Ok(())
}
}
#[async_trait]
impl VersionedStorage for SqliteBackend {
async fn vread(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
version_id: &str,
) -> StorageResult<Option<StoredResource>> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let result = conn.query_row(
"SELECT data, last_updated, is_deleted, fhir_version
FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
params![tenant_id, resource_type, id, version_id],
|row| {
let data: Vec<u8> = row.get(0)?;
let last_updated: String = row.get(1)?;
let is_deleted: i32 = row.get(2)?;
let fhir_version: String = row.get(3)?;
Ok((data, last_updated, is_deleted, fhir_version))
},
);
match result {
Ok((data, last_updated, is_deleted, fhir_version_str)) => {
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let deleted_at = if is_deleted != 0 {
Some(last_updated)
} else {
None
};
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
Ok(Some(StoredResource::from_storage(
resource_type,
id,
version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
deleted_at,
fhir_version,
)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(internal_error(format!("Failed to read version: {}", e))),
}
}
async fn update_with_match(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
expected_version: &str,
resource: Value,
) -> StorageResult<StoredResource> {
let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
})
})?;
if current.version_id() != expected_version {
return Err(StorageError::Concurrency(
ConcurrencyError::VersionConflict {
resource_type: resource_type.to_string(),
id: id.to_string(),
expected_version: expected_version.to_string(),
actual_version: current.version_id().to_string(),
},
));
}
self.update(tenant, ¤t, resource).await
}
async fn delete_with_match(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
expected_version: &str,
) -> StorageResult<()> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let current_version: Result<String, _> = conn.query_row(
"SELECT version_id FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
params![tenant_id, resource_type, id],
|row| row.get(0),
);
let current_version = match current_version {
Ok(v) => v,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
Err(e) => {
return Err(internal_error(format!(
"Failed to get current version: {}",
e
)));
}
};
if current_version != expected_version {
return Err(StorageError::Concurrency(
ConcurrencyError::VersionConflict {
resource_type: resource_type.to_string(),
id: id.to_string(),
expected_version: expected_version.to_string(),
actual_version: current_version,
},
));
}
self.delete(tenant, resource_type, id).await
}
async fn list_versions(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<Vec<String>> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let mut stmt = conn
.prepare(
"SELECT version_id FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3
ORDER BY CAST(version_id AS INTEGER) ASC",
)
.map_err(|e| internal_error(format!("Failed to prepare query: {}", e)))?;
let versions = stmt
.query_map(params![tenant_id, resource_type, id], |row| row.get(0))
.map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?
.filter_map(|r| r.ok())
.collect();
Ok(versions)
}
}
#[async_trait]
impl InstanceHistoryProvider for SqliteBackend {
async fn history_instance(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
params: &HistoryParams,
) -> StorageResult<HistoryPage> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let mut sql = String::from(
"SELECT version_id, data, last_updated, is_deleted, fhir_version
FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
);
if !params.include_deleted {
sql.push_str(" AND is_deleted = 0");
}
if let Some(since) = ¶ms.since {
sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
}
if let Some(before) = ¶ms.before {
sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
}
if let Some(cursor) = params.pagination.cursor_value() {
if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
sql.push_str(&format!(
" AND CAST(version_id AS INTEGER) < {}",
version_str.parse::<i64>().unwrap_or(i64::MAX)
));
}
}
sql.push_str(" ORDER BY CAST(version_id AS INTEGER) DESC");
sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1));
let mut stmt = conn
.prepare(&sql)
.map_err(|e| internal_error(format!("Failed to prepare history query: {}", e)))?;
let rows = stmt
.query_map(params![tenant_id, resource_type, id], |row| {
let version_id: String = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
let last_updated: String = row.get(2)?;
let is_deleted: i32 = row.get(3)?;
let fhir_version: String = row.get(4)?;
Ok((version_id, data, last_updated, is_deleted, fhir_version))
})
.map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
let mut entries = Vec::new();
let mut last_version: Option<String> = None;
for row in rows {
let (version_id, data, last_updated_str, is_deleted, fhir_version_str) =
row.map_err(|e| internal_error(format!("Failed to read history row: {}", e)))?;
if entries.len() >= params.pagination.count as usize {
break;
}
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let deleted_at = if is_deleted != 0 {
Some(last_updated)
} else {
None
};
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
let resource = StoredResource::from_storage(
resource_type,
id,
&version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
deleted_at,
fhir_version,
);
let method = if is_deleted != 0 {
HistoryMethod::Delete
} else if version_id == "1" {
HistoryMethod::Post
} else {
HistoryMethod::Put
};
last_version = Some(version_id);
entries.push(HistoryEntry {
resource,
method,
timestamp: last_updated,
});
}
let has_more = stmt
.query_map(params![tenant_id, resource_type, id], |_| Ok(()))
.map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
.count()
> params.pagination.count as usize;
let page_info = if let (true, Some(version)) = (has_more, last_version) {
let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
PageInfo::with_next(cursor)
} else {
PageInfo::end()
};
Ok(Page::new(entries, page_info))
}
async fn history_instance_count(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|row| row.get(0),
)
.map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
Ok(count as u64)
}
async fn delete_instance_history(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let exists: bool = conn
.query_row(
"SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|_| Ok(true),
)
.unwrap_or(false);
if !exists {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
let current_version: String = conn
.query_row(
"SELECT version_id FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|row| row.get(0),
)
.map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
let deleted = conn
.execute(
"DELETE FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id != ?4",
params![tenant_id, resource_type, id, current_version],
)
.map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
Ok(deleted as u64)
}
async fn delete_version(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
version_id: &str,
) -> StorageResult<()> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let current_version: Result<String, _> = conn.query_row(
"SELECT version_id FROM resources
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|row| row.get(0),
);
let current_version = match current_version {
Ok(v) => v,
Err(rusqlite::Error::QueryReturnedNoRows) => {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
Err(e) => {
return Err(internal_error(format!(
"Failed to get current version: {}",
e
)));
}
};
if version_id == current_version {
return Err(StorageError::Validation(
crate::error::ValidationError::InvalidResource {
message: format!(
"Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
version_id, resource_type, id
),
details: vec![],
},
));
}
let version_exists: bool = conn
.query_row(
"SELECT 1 FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
params![tenant_id, resource_type, id, version_id],
|_| Ok(true),
)
.unwrap_or(false);
if !version_exists {
return Err(StorageError::Resource(ResourceError::VersionNotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
version_id: version_id.to_string(),
}));
}
conn.execute(
"DELETE FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
params![tenant_id, resource_type, id, version_id],
)
.map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
Ok(())
}
}
#[async_trait]
impl TypeHistoryProvider for SqliteBackend {
async fn history_type(
&self,
tenant: &TenantContext,
resource_type: &str,
params: &HistoryParams,
) -> StorageResult<HistoryPage> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let mut sql = String::from(
"SELECT id, version_id, data, last_updated, is_deleted, fhir_version
FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2",
);
if !params.include_deleted {
sql.push_str(" AND is_deleted = 0");
}
if let Some(since) = ¶ms.since {
sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
}
if let Some(before) = ¶ms.before {
sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
}
if let Some(cursor) = params.pagination.cursor_value() {
let sort_values = cursor.sort_values();
if sort_values.len() >= 2 {
if let (
Some(CursorValue::String(timestamp)),
Some(CursorValue::String(resource_id)),
) = (sort_values.first(), sort_values.get(1))
{
sql.push_str(&format!(
" AND (last_updated < '{}' OR (last_updated = '{}' AND id < '{}'))",
timestamp, timestamp, resource_id
));
}
}
}
sql.push_str(" ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC");
sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1));
let mut stmt = conn
.prepare(&sql)
.map_err(|e| internal_error(format!("Failed to prepare type history query: {}", e)))?;
let rows = stmt
.query_map(params![tenant_id, resource_type], |row| {
let id: String = row.get(0)?;
let version_id: String = row.get(1)?;
let data: Vec<u8> = row.get(2)?;
let last_updated: String = row.get(3)?;
let is_deleted: i32 = row.get(4)?;
let fhir_version: String = row.get(5)?;
Ok((id, version_id, data, last_updated, is_deleted, fhir_version))
})
.map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
let mut entries = Vec::new();
let mut last_entry: Option<(String, String)> = None;
for row in rows {
let (id, version_id, data, last_updated_str, is_deleted, fhir_version_str) =
row.map_err(|e| internal_error(format!("Failed to read type history row: {}", e)))?;
if entries.len() >= params.pagination.count as usize {
break;
}
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let deleted_at = if is_deleted != 0 {
Some(last_updated)
} else {
None
};
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
let resource = StoredResource::from_storage(
resource_type,
&id,
&version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
deleted_at,
fhir_version,
);
let method = if is_deleted != 0 {
HistoryMethod::Delete
} else if version_id == "1" {
HistoryMethod::Post
} else {
HistoryMethod::Put
};
last_entry = Some((last_updated_str.clone(), id));
entries.push(HistoryEntry {
resource,
method,
timestamp: last_updated,
});
}
let _total_fetched = entries.len();
let has_more = {
let check_sql = sql.replace(
&format!(" LIMIT {}", params.pagination.count + 1),
&format!(" LIMIT {}", params.pagination.count + 2),
);
let mut check_stmt = conn
.prepare(&check_sql)
.map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
let check_count = check_stmt
.query_map(params![tenant_id, resource_type], |_| Ok(()))
.map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
.count();
check_count > params.pagination.count as usize
};
let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
let cursor = PageCursor::new(
vec![CursorValue::String(timestamp), CursorValue::String(id)],
resource_type.to_string(),
);
PageInfo::with_next(cursor)
} else {
PageInfo::end()
};
Ok(Page::new(entries, page_info))
}
async fn history_type_count(
&self,
tenant: &TenantContext,
resource_type: &str,
) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM resource_history
WHERE tenant_id = ?1 AND resource_type = ?2",
params![tenant_id, resource_type],
|row| row.get(0),
)
.map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
Ok(count as u64)
}
}
#[async_trait]
impl SystemHistoryProvider for SqliteBackend {
async fn history_system(
&self,
tenant: &TenantContext,
params: &HistoryParams,
) -> StorageResult<HistoryPage> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let mut sql = String::from(
"SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
FROM resource_history
WHERE tenant_id = ?1",
);
if !params.include_deleted {
sql.push_str(" AND is_deleted = 0");
}
if let Some(since) = ¶ms.since {
sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
}
if let Some(before) = ¶ms.before {
sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
}
if let Some(cursor) = params.pagination.cursor_value() {
let sort_values = cursor.sort_values();
if sort_values.len() >= 3 {
if let (
Some(CursorValue::String(timestamp)),
Some(CursorValue::String(res_type)),
Some(CursorValue::String(res_id)),
) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
{
sql.push_str(&format!(
" AND (last_updated < '{}' OR (last_updated = '{}' AND (resource_type < '{}' OR (resource_type = '{}' AND id < '{}'))))",
timestamp, timestamp, res_type, res_type, res_id
));
}
}
}
sql.push_str(" ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC");
sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1));
let mut stmt = conn.prepare(&sql).map_err(|e| {
internal_error(format!("Failed to prepare system history query: {}", e))
})?;
let rows = stmt
.query_map(params![tenant_id], |row| {
let resource_type: String = row.get(0)?;
let id: String = row.get(1)?;
let version_id: String = row.get(2)?;
let data: Vec<u8> = row.get(3)?;
let last_updated: String = row.get(4)?;
let is_deleted: i32 = row.get(5)?;
let fhir_version: String = row.get(6)?;
Ok((
resource_type,
id,
version_id,
data,
last_updated,
is_deleted,
fhir_version,
))
})
.map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
let mut entries = Vec::new();
let mut last_entry: Option<(String, String, String)> = None;
for row in rows {
let (
resource_type,
id,
version_id,
data,
last_updated_str,
is_deleted,
fhir_version_str,
) = row
.map_err(|e| internal_error(format!("Failed to read system history row: {}", e)))?;
if entries.len() >= params.pagination.count as usize {
break;
}
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let deleted_at = if is_deleted != 0 {
Some(last_updated)
} else {
None
};
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
let resource = StoredResource::from_storage(
&resource_type,
&id,
&version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
deleted_at,
fhir_version,
);
let method = if is_deleted != 0 {
HistoryMethod::Delete
} else if version_id == "1" {
HistoryMethod::Post
} else {
HistoryMethod::Put
};
last_entry = Some((last_updated_str.clone(), resource_type, id));
entries.push(HistoryEntry {
resource,
method,
timestamp: last_updated,
});
}
let has_more = {
let check_sql = sql.replace(
&format!(" LIMIT {}", params.pagination.count + 1),
&format!(" LIMIT {}", params.pagination.count + 2),
);
let mut check_stmt = conn
.prepare(&check_sql)
.map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
let check_count = check_stmt
.query_map(params![tenant_id], |_| Ok(()))
.map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
.count();
check_count > params.pagination.count as usize
};
let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
{
let cursor = PageCursor::new(
vec![
CursorValue::String(timestamp),
CursorValue::String(resource_type),
CursorValue::String(id),
],
"system".to_string(),
);
PageInfo::with_next(cursor)
} else {
PageInfo::end()
};
Ok(Page::new(entries, page_info))
}
async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM resource_history WHERE tenant_id = ?1",
params![tenant_id],
|row| row.get(0),
)
.map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
Ok(count as u64)
}
}
#[async_trait]
impl PurgableStorage for SqliteBackend {
async fn purge(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<()> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let exists: bool = conn
.query_row(
"SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|_| Ok(true),
)
.unwrap_or(false);
if !exists {
let history_exists: bool = conn
.query_row(
"SELECT 1 FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
|_| Ok(true),
)
.unwrap_or(false);
if !history_exists {
return Err(StorageError::Resource(ResourceError::NotFound {
resource_type: resource_type.to_string(),
id: id.to_string(),
}));
}
}
conn.execute(
"DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
)
.map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
conn.execute(
"DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
params![tenant_id, resource_type, id],
)
.map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
conn.execute(
"DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
params![tenant_id, resource_type, id],
)
.map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
Ok(())
}
async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let count: i64 = conn
.query_row(
"SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
params![tenant_id, resource_type],
|row| row.get(0),
)
.unwrap_or(0);
conn.execute(
"DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
params![tenant_id, resource_type],
)
.map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
conn.execute(
"DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2",
params![tenant_id, resource_type],
)
.map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
conn.execute(
"DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2",
params![tenant_id, resource_type],
)
.map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
Ok(count as u64)
}
}
#[async_trait]
impl DifferentialHistoryProvider for SqliteBackend {
async fn modified_since(
&self,
tenant: &TenantContext,
resource_type: Option<&str>,
since: chrono::DateTime<Utc>,
pagination: &Pagination,
) -> StorageResult<Page<StoredResource>> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let since_str = since.to_rfc3339();
let mut sql = String::from(
"SELECT resource_type, id, version_id, data, last_updated, fhir_version
FROM resources
WHERE tenant_id = ?1 AND last_updated > ?2 AND is_deleted = 0",
);
if let Some(rt) = resource_type {
sql.push_str(&format!(" AND resource_type = '{}'", rt));
}
if let Some(cursor) = pagination.cursor_value() {
let sort_values = cursor.sort_values();
if sort_values.len() >= 2 {
if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
(sort_values.first(), sort_values.get(1))
{
sql.push_str(&format!(
" AND (last_updated > '{}' OR (last_updated = '{}' AND id > '{}'))",
timestamp, timestamp, res_id
));
}
}
}
sql.push_str(" ORDER BY last_updated ASC, id ASC");
sql.push_str(&format!(" LIMIT {}", pagination.count + 1));
let mut stmt = conn.prepare(&sql).map_err(|e| {
internal_error(format!("Failed to prepare modified_since query: {}", e))
})?;
let rows = stmt
.query_map(params![tenant_id, since_str], |row| {
let resource_type: String = row.get(0)?;
let id: String = row.get(1)?;
let version_id: String = row.get(2)?;
let data: Vec<u8> = row.get(3)?;
let last_updated: String = row.get(4)?;
let fhir_version: String = row.get(5)?;
Ok((
resource_type,
id,
version_id,
data,
last_updated,
fhir_version,
))
})
.map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
let mut resources = Vec::new();
let mut last_entry: Option<(String, String)> = None;
for row in rows {
let (resource_type, id, version_id, data, last_updated_str, fhir_version_str) =
row.map_err(|e| internal_error(format!("Failed to read row: {}", e)))?;
if resources.len() >= pagination.count as usize {
break;
}
let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
serialization_error(format!("Failed to deserialize resource: {}", e))
})?;
let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
.map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
.with_timezone(&Utc);
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
let resource = StoredResource::from_storage(
&resource_type,
&id,
&version_id,
tenant.tenant_id().clone(),
json_data,
last_updated,
last_updated,
None,
fhir_version,
);
last_entry = Some((last_updated_str, id));
resources.push(resource);
}
let has_more = {
let check_sql = sql.replace(
&format!(" LIMIT {}", pagination.count + 1),
&format!(" LIMIT {}", pagination.count + 2),
);
let mut check_stmt = conn
.prepare(&check_sql)
.map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
let check_count = check_stmt
.query_map(params![tenant_id, since_str], |_| Ok(()))
.map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
.count();
check_count > pagination.count as usize
};
let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
let cursor = PageCursor::new(
vec![CursorValue::String(timestamp), CursorValue::String(id)],
"modified_since".to_string(),
);
PageInfo::with_next(cursor)
} else {
PageInfo::end()
};
Ok(Page::new(resources, page_info))
}
}
fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
params
.split('&')
.filter_map(|pair| {
let parts: Vec<&str> = pair.splitn(2, '=').collect();
if parts.len() == 2 {
Some((parts[0].to_string(), parts[1].to_string()))
} else {
None
}
})
.collect()
}
#[async_trait]
impl ConditionalStorage for SqliteBackend {
async fn conditional_create(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
search_params: &str,
fhir_version: FhirVersion,
) -> StorageResult<ConditionalCreateResult> {
let matches = self
.find_matching_resources(tenant, resource_type, search_params)
.await?;
match matches.len() {
0 => {
let created = self
.create(tenant, resource_type, resource, fhir_version)
.await?;
Ok(ConditionalCreateResult::Created(created))
}
1 => {
Ok(ConditionalCreateResult::Exists(
matches.into_iter().next().unwrap(),
))
}
n => {
Ok(ConditionalCreateResult::MultipleMatches(n))
}
}
}
async fn conditional_update(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
search_params: &str,
upsert: bool,
fhir_version: FhirVersion,
) -> StorageResult<ConditionalUpdateResult> {
let matches = self
.find_matching_resources(tenant, resource_type, search_params)
.await?;
match matches.len() {
0 => {
if upsert {
let created = self
.create(tenant, resource_type, resource, fhir_version)
.await?;
Ok(ConditionalUpdateResult::Created(created))
} else {
Ok(ConditionalUpdateResult::NoMatch)
}
}
1 => {
let existing = matches.into_iter().next().unwrap();
let updated = self.update(tenant, &existing, resource).await?;
Ok(ConditionalUpdateResult::Updated(updated))
}
n => {
Ok(ConditionalUpdateResult::MultipleMatches(n))
}
}
}
async fn conditional_delete(
&self,
tenant: &TenantContext,
resource_type: &str,
search_params: &str,
) -> StorageResult<ConditionalDeleteResult> {
let matches = self
.find_matching_resources(tenant, resource_type, search_params)
.await?;
match matches.len() {
0 => {
Ok(ConditionalDeleteResult::NoMatch)
}
1 => {
let existing = matches.into_iter().next().unwrap();
self.delete(tenant, resource_type, existing.id()).await?;
Ok(ConditionalDeleteResult::Deleted)
}
n => {
Ok(ConditionalDeleteResult::MultipleMatches(n))
}
}
}
async fn conditional_patch(
&self,
tenant: &TenantContext,
resource_type: &str,
search_params: &str,
patch: &crate::core::PatchFormat,
) -> StorageResult<crate::core::ConditionalPatchResult> {
use crate::core::{ConditionalPatchResult, PatchFormat};
let matches = self
.find_matching_resources(tenant, resource_type, search_params)
.await?;
match matches.len() {
0 => Ok(ConditionalPatchResult::NoMatch),
1 => {
let existing = matches.into_iter().next().unwrap();
let current_content = existing.content().clone();
let patched_content = match patch {
PatchFormat::JsonPatch(patch_doc) => {
self.apply_json_patch(¤t_content, patch_doc)?
}
PatchFormat::FhirPathPatch(patch_params) => {
self.apply_fhirpath_patch(¤t_content, patch_params)?
}
PatchFormat::MergePatch(merge_doc) => {
self.apply_merge_patch(¤t_content, merge_doc)
}
};
let updated = self.update(tenant, &existing, patched_content).await?;
Ok(ConditionalPatchResult::Patched(updated))
}
n => Ok(ConditionalPatchResult::MultipleMatches(n)),
}
}
}
impl SqliteBackend {
async fn find_matching_resources(
&self,
tenant: &TenantContext,
resource_type: &str,
search_params_str: &str,
) -> StorageResult<Vec<StoredResource>> {
let parsed_params = parse_simple_search_params(search_params_str);
if parsed_params.is_empty() {
return Ok(Vec::new());
}
let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
let query = SearchQuery {
resource_type: resource_type.to_string(),
parameters: search_params,
count: Some(1000), ..Default::default()
};
let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
Ok(result.resources.items)
}
fn build_search_parameters(
&self,
resource_type: &str,
params: &[(String, String)],
) -> StorageResult<Vec<SearchParameter>> {
let registry = self.search_registry().read();
let mut search_params = Vec::with_capacity(params.len());
for (name, value) in params {
let param_type = self
.lookup_param_type(®istry, resource_type, name)
.unwrap_or({
match name.as_str() {
"_id" => SearchParamType::Token,
"_lastUpdated" => SearchParamType::Date,
"_tag" | "_profile" | "_security" => SearchParamType::Token,
"identifier" => SearchParamType::Token,
"patient" | "subject" | "encounter" | "performer" | "author"
| "requester" | "recorder" | "asserter" | "practitioner"
| "organization" | "location" | "device" => SearchParamType::Reference,
_ => SearchParamType::String, }
});
search_params.push(SearchParameter {
name: name.clone(),
param_type,
modifier: None,
values: vec![SearchValue::parse(value)],
chain: vec![],
components: vec![],
});
}
Ok(search_params)
}
fn lookup_param_type(
&self,
registry: &crate::search::SearchParameterRegistry,
resource_type: &str,
param_name: &str,
) -> Option<SearchParamType> {
if let Some(def) = registry.get_param(resource_type, param_name) {
return Some(def.param_type);
}
if let Some(def) = registry.get_param("Resource", param_name) {
return Some(def.param_type);
}
None
}
fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
use crate::error::ValidationError;
let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
StorageError::Validation(ValidationError::InvalidResource {
message: format!("Invalid JSON Patch document: {}", e),
details: vec![],
})
})?;
let mut patched = resource.clone();
json_patch::patch(&mut patched, &patch).map_err(|e| {
StorageError::Validation(ValidationError::InvalidResource {
message: format!("Failed to apply JSON Patch: {}", e),
details: vec![],
})
})?;
Ok(patched)
}
fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
use crate::error::ValidationError;
let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
if parameter.is_none() {
return Err(StorageError::Validation(ValidationError::InvalidResource {
message: "FHIRPath Patch must have a 'parameter' array".to_string(),
details: vec![],
}));
}
let mut patched = resource.clone();
for operation in parameter.unwrap() {
let parts = operation.get("part").and_then(|p| p.as_array());
if parts.is_none() {
continue;
}
let mut op_type = None;
let mut op_path = None;
let mut op_name = None;
let mut op_value = None;
for part in parts.unwrap() {
match part.get("name").and_then(|n| n.as_str()) {
Some("type") => {
op_type = part
.get("valueCode")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
Some("path") => {
op_path = part
.get("valueString")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
Some("name") => {
op_name = part
.get("valueString")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
}
Some("value") => {
op_value = part
.get("valueString")
.or_else(|| part.get("valueBoolean"))
.or_else(|| part.get("valueInteger"))
.or_else(|| part.get("valueDecimal"))
.or_else(|| part.get("valueCode"))
.cloned();
}
_ => {}
}
}
match op_type.as_deref() {
Some("replace") => {
if let (Some(path), Some(value)) = (&op_path, &op_value) {
self.fhirpath_replace(&mut patched, path, value)?;
}
}
Some("add") => {
if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
self.fhirpath_add(&mut patched, path, name, value)?;
}
}
Some("delete") => {
if let Some(path) = &op_path {
self.fhirpath_delete(&mut patched, path)?;
}
}
_ => {
}
}
}
Ok(patched)
}
fn fhirpath_replace(
&self,
resource: &mut Value,
path: &str,
value: &Value,
) -> StorageResult<()> {
let parts: Vec<&str> = path.split('.').collect();
if parts.len() == 2 {
if let Some(obj) = resource.as_object_mut() {
obj.insert(parts[1].to_string(), value.clone());
}
}
Ok(())
}
fn fhirpath_add(
&self,
resource: &mut Value,
path: &str,
name: &str,
value: &Value,
) -> StorageResult<()> {
let parts: Vec<&str> = path.split('.').collect();
if parts.len() == 1
&& parts[0]
== resource
.get("resourceType")
.and_then(|r| r.as_str())
.unwrap_or("")
{
if let Some(obj) = resource.as_object_mut() {
obj.insert(name.to_string(), value.clone());
}
}
Ok(())
}
fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
let parts: Vec<&str> = path.split('.').collect();
if parts.len() == 2 {
if let Some(obj) = resource.as_object_mut() {
obj.remove(parts[1]);
}
}
Ok(())
}
fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
let mut patched = resource.clone();
json_patch::merge(&mut patched, merge_doc);
patched
}
}
#[async_trait]
impl BundleProvider for SqliteBackend {
async fn process_transaction(
&self,
tenant: &TenantContext,
entries: Vec<BundleEntry>,
) -> Result<BundleResult, TransactionError> {
use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
use std::collections::HashMap;
let mut tx = self
.begin_transaction(tenant, TransactionOptions::new())
.await
.map_err(|e| TransactionError::RolledBack {
reason: format!("Failed to begin transaction: {}", e),
})?;
let mut results = Vec::with_capacity(entries.len());
let mut error_info: Option<(usize, String)> = None;
let mut reference_map: HashMap<String, String> = HashMap::new();
let mut entries = entries;
for (idx, entry) in entries.iter_mut().enumerate() {
if let Some(ref mut resource) = entry.resource {
resolve_bundle_references(resource, &reference_map);
}
let result = self.process_bundle_entry_tx(&mut tx, entry).await;
match result {
Ok(entry_result) => {
if entry_result.status >= 400 {
error_info = Some((
idx,
format!("Entry failed with status {}", entry_result.status),
));
break;
}
if entry.method == BundleMethod::Post {
if let Some(ref full_url) = entry.full_url {
if let Some(ref location) = entry_result.location {
let reference = location
.split("/_history")
.next()
.unwrap_or(location)
.to_string();
reference_map.insert(full_url.clone(), reference);
}
}
}
results.push(entry_result);
}
Err(e) => {
error_info = Some((idx, format!("Entry processing failed: {}", e)));
break;
}
}
}
if let Some((index, message)) = error_info {
let _ = Box::new(tx).rollback().await;
return Err(TransactionError::BundleError { index, message });
}
Box::new(tx)
.commit()
.await
.map_err(|e| TransactionError::RolledBack {
reason: format!("Commit failed: {}", e),
})?;
Ok(BundleResult {
bundle_type: BundleType::Transaction,
entries: results,
})
}
async fn process_batch(
&self,
tenant: &TenantContext,
entries: Vec<BundleEntry>,
) -> StorageResult<BundleResult> {
let mut results = Vec::with_capacity(entries.len());
for entry in &entries {
let result = self.process_batch_entry(tenant, entry).await;
results.push(result);
}
Ok(BundleResult {
bundle_type: BundleType::Batch,
entries: results,
})
}
}
impl SqliteBackend {
async fn process_bundle_entry_tx(
&self,
tx: &mut crate::backends::sqlite::transaction::SqliteTransaction,
entry: &BundleEntry,
) -> StorageResult<BundleEntryResult> {
use crate::core::transaction::Transaction;
match entry.method {
BundleMethod::Get => {
let (resource_type, id) = self.parse_url(&entry.url)?;
match tx.read(&resource_type, &id).await? {
Some(resource) => Ok(BundleEntryResult::ok(resource)),
None => Ok(BundleEntryResult::error(
404,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "not-found"}]
}),
)),
}
}
BundleMethod::Post => {
let resource = entry.resource.clone().ok_or_else(|| {
StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
field: "resource".to_string(),
})
})?;
let resource_type = resource
.get("resourceType")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
StorageError::Validation(
crate::error::ValidationError::MissingRequiredField {
field: "resourceType".to_string(),
},
)
})?;
let created = tx.create(&resource_type, resource).await?;
Ok(BundleEntryResult::created(created))
}
BundleMethod::Put => {
let resource = entry.resource.clone().ok_or_else(|| {
StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
field: "resource".to_string(),
})
})?;
let (resource_type, id) = self.parse_url(&entry.url)?;
match tx.read(&resource_type, &id).await? {
Some(existing) => {
if let Some(ref if_match) = entry.if_match {
let current_etag = existing.etag();
if current_etag != if_match.as_str() {
return Ok(BundleEntryResult::error(
412,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
}),
));
}
}
let updated = tx.update(&existing, resource).await?;
Ok(BundleEntryResult::ok(updated))
}
None => {
let mut resource_with_id = resource;
resource_with_id["id"] = serde_json::json!(id);
let created = tx.create(&resource_type, resource_with_id).await?;
Ok(BundleEntryResult::created(created))
}
}
}
BundleMethod::Delete => {
let (resource_type, id) = self.parse_url(&entry.url)?;
tx.delete(&resource_type, &id).await?;
Ok(BundleEntryResult::deleted())
}
BundleMethod::Patch => {
Ok(BundleEntryResult::error(
501,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
}),
))
}
}
}
async fn process_batch_entry(
&self,
tenant: &TenantContext,
entry: &BundleEntry,
) -> BundleEntryResult {
match self.process_batch_entry_inner(tenant, entry).await {
Ok(result) => result,
Err(e) => BundleEntryResult::error(
500,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
}),
),
}
}
async fn process_batch_entry_inner(
&self,
tenant: &TenantContext,
entry: &BundleEntry,
) -> StorageResult<BundleEntryResult> {
match entry.method {
BundleMethod::Get => {
let (resource_type, id) = self.parse_url(&entry.url)?;
match self.read(tenant, &resource_type, &id).await? {
Some(resource) => Ok(BundleEntryResult::ok(resource)),
None => Ok(BundleEntryResult::error(
404,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "not-found"}]
}),
)),
}
}
BundleMethod::Post => {
let resource = entry.resource.clone().ok_or_else(|| {
StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
field: "resource".to_string(),
})
})?;
let resource_type = resource
.get("resourceType")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
StorageError::Validation(
crate::error::ValidationError::MissingRequiredField {
field: "resourceType".to_string(),
},
)
})?;
let created = self
.create(tenant, &resource_type, resource, FhirVersion::default())
.await?;
Ok(BundleEntryResult::created(created))
}
BundleMethod::Put => {
let resource = entry.resource.clone().ok_or_else(|| {
StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
field: "resource".to_string(),
})
})?;
let (resource_type, id) = self.parse_url(&entry.url)?;
let (stored, _created) = self
.create_or_update(
tenant,
&resource_type,
&id,
resource,
FhirVersion::default(),
)
.await?;
Ok(BundleEntryResult::ok(stored))
}
BundleMethod::Delete => {
let (resource_type, id) = self.parse_url(&entry.url)?;
match self.delete(tenant, &resource_type, &id).await {
Ok(()) => Ok(BundleEntryResult::deleted()),
Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
Ok(BundleEntryResult::deleted()) }
Err(e) => Err(e),
}
}
BundleMethod::Patch => Ok(BundleEntryResult::error(
501,
serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
}),
)),
}
}
fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
let path = url
.strip_prefix("http://")
.or_else(|| url.strip_prefix("https://"))
.map(|s| {
s.find('/').map(|i| &s[i..]).unwrap_or(s)
})
.unwrap_or(url);
let path = path.trim_start_matches('/');
let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
if parts.len() >= 2 {
let len = parts.len();
Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
} else {
Err(StorageError::Validation(
crate::error::ValidationError::InvalidReference {
reference: url.to_string(),
message: "URL must be in format ResourceType/id".to_string(),
},
))
}
}
}
fn resolve_bundle_references(
value: &mut serde_json::Value,
reference_map: &std::collections::HashMap<String, String>,
) {
use serde_json::Value;
match value {
Value::Object(map) => {
if let Some(Value::String(ref_str)) = map.get("reference") {
if ref_str.starts_with("urn:uuid:") {
if let Some(resolved) = reference_map.get(ref_str) {
map.insert("reference".to_string(), Value::String(resolved.clone()));
}
}
}
for v in map.values_mut() {
resolve_bundle_references(v, reference_map);
}
}
Value::Array(arr) => {
for item in arr {
resolve_bundle_references(item, reference_map);
}
}
_ => {}
}
}
#[async_trait]
impl ReindexableStorage for SqliteBackend {
async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str().to_string();
let mut stmt = conn
.prepare(
"SELECT DISTINCT resource_type FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
)
.map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
let types: Vec<String> = stmt
.query_map([&tenant_id], |row| row.get(0))
.map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?
.filter_map(|r| r.ok())
.collect();
Ok(types)
}
async fn count_resources(
&self,
tenant: &TenantContext,
resource_type: &str,
) -> StorageResult<u64> {
self.count(tenant, Some(resource_type)).await
}
async fn fetch_resources_page(
&self,
tenant: &TenantContext,
resource_type: &str,
cursor: Option<&str>,
limit: u32,
) -> StorageResult<ResourcePage> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str().to_string();
let (cursor_ts, cursor_id) = if let Some(c) = cursor {
let parts: Vec<&str> = c.split('|').collect();
if parts.len() == 2 {
(Some(parts[0].to_string()), Some(parts[1].to_string()))
} else {
(None, None)
}
} else {
(None, None)
};
let (sql, params): (String, Vec<Box<dyn ToSql>>) =
if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
(
"SELECT id, version_id, data, last_updated, fhir_version FROM resources \
WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
AND (last_updated > ?3 OR (last_updated = ?3 AND id > ?4)) \
ORDER BY last_updated ASC, id ASC LIMIT ?5"
.to_string(),
vec![
Box::new(tenant_id.clone()) as Box<dyn ToSql>,
Box::new(resource_type.to_string()),
Box::new(ts.clone()),
Box::new(id.clone()),
Box::new(limit as i64),
],
)
} else {
(
"SELECT id, version_id, data, last_updated, fhir_version FROM resources \
WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
ORDER BY last_updated ASC, id ASC LIMIT ?3"
.to_string(),
vec![
Box::new(tenant_id.clone()) as Box<dyn ToSql>,
Box::new(resource_type.to_string()),
Box::new(limit as i64),
],
)
};
let mut stmt = conn
.prepare(&sql)
.map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
let resources: Vec<StoredResource> = stmt
.query_map(param_refs.as_slice(), |row| {
let id: String = row.get(0)?;
let version_id: String = row.get(1)?;
let data: Vec<u8> = row.get(2)?;
let last_updated: String = row.get(3)?;
let fhir_version: String = row.get(4)?;
Ok((id, version_id, data, last_updated, fhir_version))
})
.map_err(|e| internal_error(format!("Failed to query resources: {}", e)))?
.filter_map(|r| r.ok())
.filter_map(|(id, version_id, data, last_updated, fhir_version_str)| {
let content: Value = serde_json::from_slice(&data).ok()?;
let last_modified = chrono::DateTime::parse_from_rfc3339(&last_updated)
.ok()?
.with_timezone(&Utc);
let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
Some(StoredResource::from_storage(
resource_type.to_string(),
id,
version_id,
tenant.tenant_id().clone(),
content,
last_modified, last_modified,
None, fhir_version,
))
})
.collect();
let next_cursor = if resources.len() == limit as usize {
resources
.last()
.map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
} else {
None
};
Ok(ResourcePage {
resources,
next_cursor,
})
}
async fn delete_search_entries(
&self,
tenant: &TenantContext,
resource_type: &str,
resource_id: &str,
) -> StorageResult<()> {
let conn = self.get_connection()?;
self.delete_search_index(
&conn,
tenant.tenant_id().as_str(),
resource_type,
resource_id,
)
}
async fn write_search_entries(
&self,
tenant: &TenantContext,
resource_type: &str,
resource_id: &str,
resource: &Value,
) -> StorageResult<usize> {
let conn = self.get_connection()?;
let values = self
.search_extractor()
.extract(resource, resource_type)
.map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
let mut count = 0;
for value in values {
self.write_index_entry(
&conn,
tenant.tenant_id().as_str(),
resource_type,
resource_id,
&value,
)?;
count += 1;
}
Ok(count)
}
async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
let conn = self.get_connection()?;
let tenant_id = tenant.tenant_id().as_str();
let deleted = conn
.execute(
"DELETE FROM search_index WHERE tenant_id = ?1",
params![tenant_id],
)
.map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
Ok(deleted as u64)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::history::HistoryParams;
use crate::tenant::{TenantId, TenantPermissions};
use serde_json::json;
use std::path::PathBuf;
use crate::backends::sqlite::SqliteBackendConfig;
fn create_test_backend() -> SqliteBackend {
let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.and_then(|p| p.parent())
.map(|p| p.join("data"))
.unwrap_or_else(|| PathBuf::from("data"));
let config = SqliteBackendConfig {
data_dir: Some(data_dir),
..Default::default()
};
let backend = SqliteBackend::with_config(":memory:", config).unwrap();
backend.init_schema().unwrap();
backend
}
fn create_test_tenant() -> TenantContext {
TenantContext::new(
TenantId::new("test-tenant"),
TenantPermissions::full_access(),
)
}
#[tokio::test]
async fn test_create_and_read() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({
"resourceType": "Patient",
"name": [{"family": "Test", "given": ["User"]}]
});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
assert_eq!(created.resource_type(), "Patient");
assert_eq!(created.version_id(), "1");
let read = backend
.read(&tenant, "Patient", created.id())
.await
.unwrap();
assert!(read.is_some());
let read = read.unwrap();
assert_eq!(read.version_id(), "1");
}
#[tokio::test]
async fn test_create_with_id() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({
"resourceType": "Patient",
"id": "patient-123",
"name": [{"family": "Test"}]
});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
assert_eq!(created.id(), "patient-123");
}
#[tokio::test]
async fn test_create_duplicate_fails() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({"id": "patient-1"});
backend
.create(&tenant, "Patient", resource.clone(), FhirVersion::default())
.await
.unwrap();
let result = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await;
assert!(matches!(
result,
Err(StorageError::Resource(ResourceError::AlreadyExists { .. }))
));
}
#[tokio::test]
async fn test_read_nonexistent() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.read(&tenant, "Patient", "nonexistent")
.await
.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_update() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({"name": [{"family": "Original"}]});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let updated_content = json!({"name": [{"family": "Updated"}]});
let updated = backend
.update(&tenant, &created, updated_content)
.await
.unwrap();
assert_eq!(updated.version_id(), "2");
let read = backend
.read(&tenant, "Patient", created.id())
.await
.unwrap()
.unwrap();
assert_eq!(read.content()["name"][0]["family"], "Updated");
}
#[tokio::test]
async fn test_update_version_conflict() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let _ = backend.update(&tenant, &created, json!({})).await.unwrap();
let result = backend.update(&tenant, &created, json!({})).await;
assert!(matches!(
result,
Err(StorageError::Concurrency(
ConcurrencyError::VersionConflict { .. }
))
));
}
#[tokio::test]
async fn test_delete() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
backend
.delete(&tenant, "Patient", created.id())
.await
.unwrap();
let result = backend.read(&tenant, "Patient", created.id()).await;
assert!(matches!(
result,
Err(StorageError::Resource(ResourceError::Gone { .. }))
));
}
#[tokio::test]
async fn test_create_or_update_new() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let (resource, created) = backend
.create_or_update(
&tenant,
"Patient",
"new-id",
json!({}),
FhirVersion::default(),
)
.await
.unwrap();
assert!(created);
assert_eq!(resource.id(), "new-id");
assert_eq!(resource.version_id(), "1");
}
#[tokio::test]
async fn test_create_or_update_existing() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "existing-id"}),
FhirVersion::default(),
)
.await
.unwrap();
let (resource, created) = backend
.create_or_update(
&tenant,
"Patient",
"existing-id",
json!({}),
FhirVersion::default(),
)
.await
.unwrap();
assert!(!created);
assert_eq!(resource.version_id(), "2");
}
#[tokio::test]
async fn test_count() {
let backend = create_test_backend();
let tenant = create_test_tenant();
assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 0);
backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 2);
assert_eq!(
backend.count(&tenant, Some("Observation")).await.unwrap(),
1
);
assert_eq!(backend.count(&tenant, None).await.unwrap(), 3);
}
#[tokio::test]
async fn test_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
let resource = json!({"id": "patient-1"});
backend
.create(&tenant1, "Patient", resource, FhirVersion::default())
.await
.unwrap();
assert!(
backend
.read(&tenant1, "Patient", "patient-1")
.await
.unwrap()
.is_some()
);
assert!(
backend
.read(&tenant2, "Patient", "patient-1")
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_history_instance_basic() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({"name": [{"family": "Smith"}]});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let v2 = backend
.update(&tenant, &created, json!({"name": [{"family": "Jones"}]}))
.await
.unwrap();
let _v3 = backend
.update(&tenant, &v2, json!({"name": [{"family": "Brown"}]}))
.await
.unwrap();
let params = HistoryParams::new();
let history = backend
.history_instance(&tenant, "Patient", created.id(), ¶ms)
.await
.unwrap();
assert_eq!(history.items.len(), 3);
assert_eq!(history.items[0].resource.version_id(), "3");
assert_eq!(history.items[1].resource.version_id(), "2");
assert_eq!(history.items[2].resource.version_id(), "1");
assert_eq!(history.items[0].method, HistoryMethod::Put);
assert_eq!(history.items[1].method, HistoryMethod::Put);
assert_eq!(history.items[2].method, HistoryMethod::Post);
}
#[tokio::test]
async fn test_history_instance_count() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let v2 = backend.update(&tenant, &created, json!({})).await.unwrap();
let _v3 = backend.update(&tenant, &v2, json!({})).await.unwrap();
let count = backend
.history_instance_count(&tenant, "Patient", created.id())
.await
.unwrap();
assert_eq!(count, 3);
}
#[tokio::test]
async fn test_history_instance_with_delete() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({"id": "p1"});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let _v2 = backend
.update(&tenant, &created, json!({"id": "p1"}))
.await
.unwrap();
backend.delete(&tenant, "Patient", "p1").await.unwrap();
let params = HistoryParams::new().include_deleted(true);
let history = backend
.history_instance(&tenant, "Patient", "p1", ¶ms)
.await
.unwrap();
assert_eq!(history.items.len(), 3);
assert_eq!(history.items[0].method, HistoryMethod::Delete);
assert_eq!(history.items[0].resource.version_id(), "3");
}
#[tokio::test]
async fn test_history_instance_exclude_deleted() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({"id": "p2"});
let created = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let _v2 = backend
.update(&tenant, &created, json!({"id": "p2"}))
.await
.unwrap();
backend.delete(&tenant, "Patient", "p2").await.unwrap();
let params = HistoryParams::new().include_deleted(false);
let history = backend
.history_instance(&tenant, "Patient", "p2", ¶ms)
.await
.unwrap();
assert_eq!(history.items.len(), 2);
assert_eq!(history.items[0].resource.version_id(), "2");
assert_eq!(history.items[1].resource.version_id(), "1");
}
#[tokio::test]
async fn test_history_instance_pagination() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let resource = json!({});
let mut current = backend
.create(&tenant, "Patient", resource, FhirVersion::default())
.await
.unwrap();
for _ in 0..4 {
current = backend.update(&tenant, ¤t, json!({})).await.unwrap();
}
let params = HistoryParams::new().count(2);
let page1 = backend
.history_instance(&tenant, "Patient", current.id(), ¶ms)
.await
.unwrap();
assert_eq!(page1.items.len(), 2);
assert_eq!(page1.items[0].resource.version_id(), "5");
assert_eq!(page1.items[1].resource.version_id(), "4");
assert!(page1.page_info.has_next);
}
#[tokio::test]
async fn test_history_instance_nonexistent() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let params = HistoryParams::new();
let history = backend
.history_instance(&tenant, "Patient", "nonexistent", ¶ms)
.await
.unwrap();
assert!(history.items.is_empty());
}
#[tokio::test]
async fn test_history_instance_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
let resource = json!({"id": "shared-id"});
let created = backend
.create(&tenant1, "Patient", resource, FhirVersion::default())
.await
.unwrap();
let _v2 = backend
.update(&tenant1, &created, json!({"id": "shared-id"}))
.await
.unwrap();
let history1 = backend
.history_instance(&tenant1, "Patient", "shared-id", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history1.items.len(), 2);
let history2 = backend
.history_instance(&tenant2, "Patient", "shared-id", &HistoryParams::new())
.await
.unwrap();
assert!(history2.items.is_empty());
}
#[tokio::test]
async fn test_history_type_basic() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let _p2 = backend
.create(
&tenant,
"Patient",
json!({"id": "p2"}),
FhirVersion::default(),
)
.await
.unwrap();
let _p1_v2 = backend
.update(&tenant, &p1, json!({"id": "p1"}))
.await
.unwrap();
let params = HistoryParams::new();
let history = backend
.history_type(&tenant, "Patient", ¶ms)
.await
.unwrap();
assert_eq!(history.items.len(), 3);
for entry in &history.items {
assert_eq!(entry.resource.resource_type(), "Patient");
}
}
#[tokio::test]
async fn test_history_type_count() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
let _p2 = backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
let count = backend
.history_type_count(&tenant, "Patient")
.await
.unwrap();
assert_eq!(count, 3);
let obs_count = backend
.history_type_count(&tenant, "Observation")
.await
.unwrap();
assert_eq!(obs_count, 1);
}
#[tokio::test]
async fn test_history_type_filters_by_type() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Encounter", json!({}), FhirVersion::default())
.await
.unwrap();
let history = backend
.history_type(&tenant, "Patient", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 1);
assert_eq!(history.items[0].resource.resource_type(), "Patient");
let obs_history = backend
.history_type(&tenant, "Observation", &HistoryParams::new())
.await
.unwrap();
assert_eq!(obs_history.items.len(), 1);
assert_eq!(obs_history.items[0].resource.resource_type(), "Observation");
}
#[tokio::test]
async fn test_history_type_includes_deleted() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let _p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "del-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
backend
.create(
&tenant,
"Patient",
json!({"id": "p2"}),
FhirVersion::default(),
)
.await
.unwrap();
let history = backend
.history_type(&tenant, "Patient", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 2);
let history_with_deleted = backend
.history_type(
&tenant,
"Patient",
&HistoryParams::new().include_deleted(true),
)
.await
.unwrap();
assert_eq!(history_with_deleted.items.len(), 3); }
#[tokio::test]
async fn test_history_type_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
backend
.create(&tenant1, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant1, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant2, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
let history1 = backend
.history_type(&tenant1, "Patient", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history1.items.len(), 2);
let history2 = backend
.history_type(&tenant2, "Patient", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history2.items.len(), 1);
}
#[tokio::test]
async fn test_history_type_pagination() {
let backend = create_test_backend();
let tenant = create_test_tenant();
for i in 0..5 {
backend
.create(
&tenant,
"Patient",
json!({"id": format!("p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
let params = HistoryParams::new().count(2);
let page1 = backend
.history_type(&tenant, "Patient", ¶ms)
.await
.unwrap();
assert_eq!(page1.items.len(), 2);
assert!(page1.page_info.has_next);
}
#[tokio::test]
async fn test_history_type_empty() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let history = backend
.history_type(&tenant, "Patient", &HistoryParams::new())
.await
.unwrap();
assert!(history.items.is_empty());
assert!(!history.page_info.has_next);
}
#[tokio::test]
async fn test_history_system_basic() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Observation",
json!({"id": "o1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Encounter",
json!({"id": "e1"}),
FhirVersion::default(),
)
.await
.unwrap();
let _p1_v2 = backend
.update(&tenant, &p1, json!({"id": "p1"}))
.await
.unwrap();
let history = backend
.history_system(&tenant, &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 4);
let types: std::collections::HashSet<_> = history
.items
.iter()
.map(|e| e.resource.resource_type())
.collect();
assert!(types.contains("Patient"));
assert!(types.contains("Observation"));
assert!(types.contains("Encounter"));
}
#[tokio::test]
async fn test_history_system_count() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(&tenant, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant, "Encounter", json!({}), FhirVersion::default())
.await
.unwrap();
let count = backend.history_system_count(&tenant).await.unwrap();
assert_eq!(count, 4); }
#[tokio::test]
async fn test_history_system_includes_deleted() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "del-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
let history = backend
.history_system(&tenant, &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 2);
let history_with_deleted = backend
.history_system(&tenant, &HistoryParams::new().include_deleted(true))
.await
.unwrap();
assert_eq!(history_with_deleted.items.len(), 3); }
#[tokio::test]
async fn test_history_system_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
backend
.create(&tenant1, "Patient", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant1, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
backend
.create(&tenant2, "Encounter", json!({}), FhirVersion::default())
.await
.unwrap();
let history1 = backend
.history_system(&tenant1, &HistoryParams::new())
.await
.unwrap();
assert_eq!(history1.items.len(), 2);
let history2 = backend
.history_system(&tenant2, &HistoryParams::new())
.await
.unwrap();
assert_eq!(history2.items.len(), 1);
assert_eq!(backend.history_system_count(&tenant1).await.unwrap(), 2);
assert_eq!(backend.history_system_count(&tenant2).await.unwrap(), 1);
}
#[tokio::test]
async fn test_history_system_pagination() {
let backend = create_test_backend();
let tenant = create_test_tenant();
for i in 0..3 {
backend
.create(
&tenant,
"Patient",
json!({"id": format!("p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
for i in 0..2 {
backend
.create(
&tenant,
"Observation",
json!({"id": format!("o{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
let params = HistoryParams::new().count(2);
let page1 = backend.history_system(&tenant, ¶ms).await.unwrap();
assert_eq!(page1.items.len(), 2);
assert!(page1.page_info.has_next);
}
#[tokio::test]
async fn test_history_system_empty() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let history = backend
.history_system(&tenant, &HistoryParams::new())
.await
.unwrap();
assert!(history.items.is_empty());
assert!(!history.page_info.has_next);
assert_eq!(backend.history_system_count(&tenant).await.unwrap(), 0);
}
#[tokio::test]
async fn test_history_system_ordered_by_time() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "first"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Observation",
json!({"id": "second"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Encounter",
json!({"id": "third"}),
FhirVersion::default(),
)
.await
.unwrap();
let history = backend
.history_system(&tenant, &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 3);
assert_eq!(history.items[0].resource.id(), "third");
assert_eq!(history.items[1].resource.id(), "second");
assert_eq!(history.items[2].resource.id(), "first");
}
#[tokio::test]
async fn test_delete_instance_history() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "name": [{"family": "Smith"}]}),
FhirVersion::default(),
)
.await
.unwrap();
let p1_v2 = backend
.update(
&tenant,
&p1,
json!({"id": "p1", "name": [{"family": "Jones"}]}),
)
.await
.unwrap();
let _p1_v3 = backend
.update(
&tenant,
&p1_v2,
json!({"id": "p1", "name": [{"family": "Brown"}]}),
)
.await
.unwrap();
let history = backend
.history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 3);
let deleted_count = backend
.delete_instance_history(&tenant, "Patient", "p1")
.await
.unwrap();
assert_eq!(deleted_count, 2);
let history = backend
.history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 1);
assert_eq!(history.items[0].resource.version_id(), "3");
let resource = backend.read(&tenant, "Patient", "p1").await.unwrap();
assert!(resource.is_some());
assert_eq!(resource.unwrap().version_id(), "3");
}
#[tokio::test]
async fn test_delete_instance_history_nonexistent() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.delete_instance_history(&tenant, "Patient", "nonexistent")
.await;
assert!(matches!(
result,
Err(StorageError::Resource(ResourceError::NotFound { .. }))
));
}
#[tokio::test]
async fn test_delete_version() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "name": [{"family": "Smith"}]}),
FhirVersion::default(),
)
.await
.unwrap();
let p1_v2 = backend
.update(
&tenant,
&p1,
json!({"id": "p1", "name": [{"family": "Jones"}]}),
)
.await
.unwrap();
let _p1_v3 = backend
.update(
&tenant,
&p1_v2,
json!({"id": "p1", "name": [{"family": "Brown"}]}),
)
.await
.unwrap();
backend
.delete_version(&tenant, "Patient", "p1", "2")
.await
.unwrap();
let history = backend
.history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
.await
.unwrap();
assert_eq!(history.items.len(), 2);
let versions: Vec<&str> = history
.items
.iter()
.map(|e| e.resource.version_id())
.collect();
assert!(versions.contains(&"1"));
assert!(versions.contains(&"3"));
assert!(!versions.contains(&"2"));
}
#[tokio::test]
async fn test_delete_version_current_fails() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let _p1_v2 = backend
.update(&tenant, &p1, json!({"id": "p1"}))
.await
.unwrap();
let result = backend.delete_version(&tenant, "Patient", "p1", "2").await;
assert!(matches!(result, Err(StorageError::Validation(_))));
}
#[tokio::test]
async fn test_delete_version_nonexistent() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.delete_version(&tenant, "Patient", "p1", "999")
.await;
assert!(matches!(
result,
Err(StorageError::Resource(
ResourceError::VersionNotFound { .. }
))
));
}
#[tokio::test]
async fn test_delete_version_resource_not_found() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.delete_version(&tenant, "Patient", "nonexistent", "1")
.await;
assert!(matches!(
result,
Err(StorageError::Resource(ResourceError::NotFound { .. }))
));
}
#[tokio::test]
async fn test_purge_single_resource() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let _p1_v2 = backend
.update(&tenant, &p1, json!({"id": "p1"}))
.await
.unwrap();
backend.purge(&tenant, "Patient", "p1").await.unwrap();
let read_result = backend.read(&tenant, "Patient", "p1").await.unwrap();
assert!(read_result.is_none());
let history = backend
.history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
.await
.unwrap();
assert!(history.items.is_empty());
}
#[tokio::test]
async fn test_purge_deleted_resource() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "del-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
backend.purge(&tenant, "Patient", "del-p1").await.unwrap();
let history = backend
.history_instance(
&tenant,
"Patient",
"del-p1",
&HistoryParams::new().include_deleted(true),
)
.await
.unwrap();
assert!(history.items.is_empty());
}
#[tokio::test]
async fn test_purge_nonexistent_resource() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend.purge(&tenant, "Patient", "nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_purge_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
backend
.create(
&tenant1,
"Patient",
json!({"id": "shared-id"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant2,
"Patient",
json!({"id": "shared-id"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.purge(&tenant1, "Patient", "shared-id")
.await
.unwrap();
let t2_read = backend
.read(&tenant2, "Patient", "shared-id")
.await
.unwrap();
assert!(t2_read.is_some());
let t1_read = backend
.read(&tenant1, "Patient", "shared-id")
.await
.unwrap();
assert!(t1_read.is_none());
}
#[tokio::test]
async fn test_purge_all_single_type() {
let backend = create_test_backend();
let tenant = create_test_tenant();
for i in 0..5 {
backend
.create(
&tenant,
"Patient",
json!({"id": format!("p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
backend
.create(&tenant, "Observation", json!({}), FhirVersion::default())
.await
.unwrap();
let count = backend.purge_all(&tenant, "Patient").await.unwrap();
assert_eq!(count, 5);
let patient_history = backend
.history_type(&tenant, "Patient", &HistoryParams::new())
.await
.unwrap();
assert!(patient_history.items.is_empty());
let obs_history = backend
.history_type(&tenant, "Observation", &HistoryParams::new())
.await
.unwrap();
assert_eq!(obs_history.items.len(), 1);
}
#[tokio::test]
async fn test_purge_all_empty_type() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let count = backend.purge_all(&tenant, "Patient").await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_purge_all_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
for i in 0..3 {
backend
.create(
&tenant1,
"Patient",
json!({"id": format!("t1-p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
for i in 0..2 {
backend
.create(
&tenant2,
"Patient",
json!({"id": format!("t2-p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
let count = backend.purge_all(&tenant1, "Patient").await.unwrap();
assert_eq!(count, 3);
let t2_history = backend
.history_type(&tenant2, "Patient", &HistoryParams::new())
.await
.unwrap();
assert_eq!(t2_history.items.len(), 2);
}
#[tokio::test]
async fn test_modified_since_basic() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let before_create = Utc::now();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Patient",
json!({"id": "p2"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Observation",
json!({"id": "o1"}),
FhirVersion::default(),
)
.await
.unwrap();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(result.items.len(), 3);
}
#[tokio::test]
async fn test_modified_since_with_type_filter() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let before_create = Utc::now();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Patient",
json!({"id": "p2"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant,
"Observation",
json!({"id": "o1"}),
FhirVersion::default(),
)
.await
.unwrap();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, Some("Patient"), before_create, &pagination)
.await
.unwrap();
assert_eq!(result.items.len(), 2);
for resource in &result.items {
assert_eq!(resource.resource_type(), "Patient");
}
}
#[tokio::test]
async fn test_modified_since_excludes_older() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "old"}),
FhirVersion::default(),
)
.await
.unwrap();
let after_first = Utc::now();
backend
.create(
&tenant,
"Patient",
json!({"id": "new"}),
FhirVersion::default(),
)
.await
.unwrap();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, None, after_first, &pagination)
.await
.unwrap();
assert_eq!(result.items.len(), 1);
assert_eq!(result.items[0].id(), "new");
}
#[tokio::test]
async fn test_modified_since_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
let before_create = Utc::now();
backend
.create(
&tenant1,
"Patient",
json!({"id": "t1-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend
.create(
&tenant2,
"Patient",
json!({"id": "t2-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let pagination = Pagination::default();
let result1 = backend
.modified_since(&tenant1, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(result1.items.len(), 1);
assert_eq!(result1.items[0].id(), "t1-p1");
let result2 = backend
.modified_since(&tenant2, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(result2.items.len(), 1);
assert_eq!(result2.items[0].id(), "t2-p1");
}
#[tokio::test]
async fn test_modified_since_excludes_deleted() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let before_create = Utc::now();
backend
.create(
&tenant,
"Patient",
json!({"id": "del-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
backend
.create(
&tenant,
"Patient",
json!({"id": "live-p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(result.items.len(), 1);
assert_eq!(result.items[0].id(), "live-p1");
}
#[tokio::test]
async fn test_modified_since_pagination() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let before_create = Utc::now();
for i in 0..5 {
backend
.create(
&tenant,
"Patient",
json!({"id": format!("p{}", i)}),
FhirVersion::default(),
)
.await
.unwrap();
}
let pagination = Pagination::cursor().with_count(2);
let page1 = backend
.modified_since(&tenant, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(page1.items.len(), 2);
assert!(page1.page_info.has_next);
}
#[tokio::test]
async fn test_modified_since_empty() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, None, Utc::now(), &pagination)
.await
.unwrap();
assert!(result.items.is_empty());
assert!(!result.page_info.has_next);
}
#[tokio::test]
async fn test_modified_since_returns_current_version() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let before_create = Utc::now();
let p1 = backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "name": "v1"}),
FhirVersion::default(),
)
.await
.unwrap();
let p1_v2 = backend
.update(&tenant, &p1, json!({"id": "p1", "name": "v2"}))
.await
.unwrap();
let _p1_v3 = backend
.update(&tenant, &p1_v2, json!({"id": "p1", "name": "v3"}))
.await
.unwrap();
let pagination = Pagination::default();
let result = backend
.modified_since(&tenant, None, before_create, &pagination)
.await
.unwrap();
assert_eq!(result.items.len(), 1);
assert_eq!(result.items[0].version_id(), "3");
}
#[tokio::test]
async fn test_conditional_create_no_match() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.conditional_create(
&tenant,
"Patient",
json!({"identifier": [{"value": "12345"}]}),
"identifier=99999", FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalCreateResult::Created(resource) => {
assert_eq!(resource.resource_type(), "Patient");
}
_ => panic!("Expected Created result"),
}
}
#[tokio::test]
async fn test_conditional_create_single_match() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let existing = backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "identifier": [{"value": "12345"}]}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.conditional_create(
&tenant,
"Patient",
json!({"identifier": [{"value": "12345"}]}),
"identifier=12345",
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalCreateResult::Exists(resource) => {
assert_eq!(resource.id(), existing.id());
}
_ => panic!("Expected Exists result"),
}
}
#[tokio::test]
async fn test_conditional_create_by_id() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.conditional_create(
&tenant,
"Patient",
json!({}),
"_id=p1",
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalCreateResult::Exists(resource) => {
assert_eq!(resource.id(), "p1");
}
_ => panic!("Expected Exists result"),
}
}
#[tokio::test]
async fn test_conditional_update_single_match() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "identifier": [{"value": "12345"}], "active": false}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.conditional_update(
&tenant,
"Patient",
json!({"id": "p1", "identifier": [{"value": "12345"}], "active": true}),
"identifier=12345",
false,
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalUpdateResult::Updated(resource) => {
assert_eq!(resource.version_id(), "2");
}
_ => panic!("Expected Updated result"),
}
}
#[tokio::test]
async fn test_conditional_update_no_match_no_upsert() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.conditional_update(
&tenant,
"Patient",
json!({"identifier": [{"value": "99999"}]}),
"identifier=99999",
false,
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalUpdateResult::NoMatch => {}
_ => panic!("Expected NoMatch result"),
}
}
#[tokio::test]
async fn test_conditional_update_no_match_with_upsert() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.conditional_update(
&tenant,
"Patient",
json!({"identifier": [{"value": "new-id"}]}),
"identifier=new-id",
true,
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalUpdateResult::Created(resource) => {
assert_eq!(resource.resource_type(), "Patient");
}
_ => panic!("Expected Created result"),
}
}
#[tokio::test]
async fn test_conditional_delete_single_match() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1"}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.conditional_delete(&tenant, "Patient", "_id=p1")
.await
.unwrap();
match result {
ConditionalDeleteResult::Deleted => {
let read_result = backend.read(&tenant, "Patient", "p1").await;
match read_result {
Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
}
}
_ => panic!("Expected Deleted result"),
}
}
#[tokio::test]
async fn test_conditional_delete_no_match() {
let backend = create_test_backend();
let tenant = create_test_tenant();
let result = backend
.conditional_delete(&tenant, "Patient", "_id=nonexistent")
.await
.unwrap();
match result {
ConditionalDeleteResult::NoMatch => {}
_ => panic!("Expected NoMatch result"),
}
}
#[tokio::test]
async fn test_conditional_operations_tenant_isolation() {
let backend = create_test_backend();
let tenant1 =
TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
let tenant2 =
TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
backend
.create(
&tenant1,
"Patient",
json!({"id": "shared-id"}),
FhirVersion::default(),
)
.await
.unwrap();
let result = backend
.conditional_create(
&tenant2,
"Patient",
json!({}),
"_id=shared-id",
FhirVersion::default(),
)
.await
.unwrap();
match result {
ConditionalCreateResult::Created(_) => {}
_ => panic!("Expected Created result (tenant isolation)"),
}
}
#[tokio::test]
async fn test_conditional_patch_json_patch() {
use crate::core::PatchFormat;
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "active": false, "name": [{"family": "Smith"}]}),
FhirVersion::default(),
)
.await
.unwrap();
let patch = PatchFormat::JsonPatch(json!([
{"op": "replace", "path": "/active", "value": true}
]));
let result = backend
.conditional_patch(&tenant, "Patient", "_id=p1", &patch)
.await
.unwrap();
match result {
crate::core::ConditionalPatchResult::Patched(resource) => {
assert_eq!(resource.content()["active"], json!(true));
}
_ => panic!("Expected Patched result"),
}
}
#[tokio::test]
async fn test_conditional_patch_merge_patch() {
use crate::core::PatchFormat;
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "p1", "active": false, "gender": "unknown"}),
FhirVersion::default(),
)
.await
.unwrap();
let patch = PatchFormat::MergePatch(json!({
"active": true,
"gender": null }));
let result = backend
.conditional_patch(&tenant, "Patient", "_id=p1", &patch)
.await
.unwrap();
match result {
crate::core::ConditionalPatchResult::Patched(resource) => {
assert_eq!(resource.content()["active"], json!(true));
assert!(resource.content().get("gender").is_none());
}
_ => panic!("Expected Patched result"),
}
}
#[tokio::test]
async fn test_conditional_patch_no_match() {
use crate::core::PatchFormat;
let backend = create_test_backend();
let tenant = create_test_tenant();
let patch = PatchFormat::JsonPatch(json!([
{"op": "replace", "path": "/active", "value": true}
]));
let result = backend
.conditional_patch(&tenant, "Patient", "_id=nonexistent", &patch)
.await
.unwrap();
match result {
crate::core::ConditionalPatchResult::NoMatch => {}
_ => panic!("Expected NoMatch result"),
}
}
#[tokio::test]
async fn test_batch_create_multiple() {
use crate::core::transaction::BundleProvider;
let backend = create_test_backend();
let tenant = create_test_tenant();
let entries = vec![
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "batch-p1"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "batch-p2"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
];
let result = backend.process_batch(&tenant, entries).await.unwrap();
assert_eq!(result.entries.len(), 2);
assert_eq!(result.entries[0].status, 201);
assert_eq!(result.entries[1].status, 201);
}
#[tokio::test]
async fn test_batch_mixed_operations() {
use crate::core::transaction::BundleProvider;
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "existing"}),
FhirVersion::default(),
)
.await
.unwrap();
let entries = vec![
BundleEntry {
method: BundleMethod::Get,
url: "Patient/existing".to_string(),
resource: None,
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "new"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
BundleEntry {
method: BundleMethod::Get,
url: "Patient/nonexistent".to_string(),
resource: None,
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
];
let result = backend.process_batch(&tenant, entries).await.unwrap();
assert_eq!(result.entries.len(), 3);
assert_eq!(result.entries[0].status, 200); assert_eq!(result.entries[1].status, 201); assert_eq!(result.entries[2].status, 404); }
#[tokio::test]
async fn test_batch_delete() {
use crate::core::transaction::BundleProvider;
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "to-delete"}),
FhirVersion::default(),
)
.await
.unwrap();
let entries = vec![BundleEntry {
method: BundleMethod::Delete,
url: "Patient/to-delete".to_string(),
resource: None,
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
}];
let result = backend.process_batch(&tenant, entries).await.unwrap();
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].status, 204);
let read_result = backend.read(&tenant, "Patient", "to-delete").await;
match read_result {
Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
}
}
#[tokio::test]
async fn test_transaction_all_or_nothing() {
use crate::core::transaction::BundleProvider;
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({"id": "existing"}),
FhirVersion::default(),
)
.await
.unwrap();
let entries = vec![
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "tx-p1"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "existing"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
];
let result = backend.process_transaction(&tenant, entries).await;
assert!(result.is_err());
let read = backend.read(&tenant, "Patient", "tx-p1").await.unwrap();
assert!(read.is_none());
}
#[tokio::test]
async fn test_transaction_success() {
use crate::core::transaction::BundleProvider;
let backend = create_test_backend();
let tenant = create_test_tenant();
let entries = vec![
BundleEntry {
method: BundleMethod::Post,
url: "Patient".to_string(),
resource: Some(json!({"resourceType": "Patient", "id": "tx-success-1"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
BundleEntry {
method: BundleMethod::Post,
url: "Observation".to_string(),
resource: Some(json!({"resourceType": "Observation", "id": "tx-success-2"})),
if_match: None,
if_none_match: None,
if_none_exist: None,
full_url: None,
},
];
let result = backend.process_transaction(&tenant, entries).await.unwrap();
assert_eq!(result.entries.len(), 2);
assert_eq!(result.entries[0].status, 201);
assert_eq!(result.entries[1].status, 201);
assert!(
backend
.read(&tenant, "Patient", "tx-success-1")
.await
.unwrap()
.is_some()
);
assert!(
backend
.read(&tenant, "Observation", "tx-success-2")
.await
.unwrap()
.is_some()
);
}
#[tokio::test]
async fn test_parse_url_formats() {
let backend = create_test_backend();
let (rt, id) = backend.parse_url("Patient/123").unwrap();
assert_eq!(rt, "Patient");
assert_eq!(id, "123");
let (rt, id) = backend.parse_url("/Patient/456").unwrap();
assert_eq!(rt, "Patient");
assert_eq!(id, "456");
let (rt, id) = backend
.parse_url("http://example.com/fhir/Patient/789")
.unwrap();
assert_eq!(rt, "Patient");
assert_eq!(id, "789");
let (rt, id) = backend
.parse_url("https://example.com/fhir/Observation/obs-1")
.unwrap();
assert_eq!(rt, "Observation");
assert_eq!(id, "obs-1");
}
#[tokio::test]
async fn test_search_index_display_text_populated() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Observation",
json!({
"resourceType": "Observation",
"id": "obs-display-test",
"code": {
"coding": [
{
"system": "http://loinc.org",
"code": "8867-4",
"display": "Heart rate"
}
]
},
"status": "final"
}),
FhirVersion::default(),
)
.await
.unwrap();
let conn = backend.get_connection().unwrap();
let mut stmt = conn
.prepare(
"SELECT param_name, value_token_system, value_token_code, value_token_display
FROM search_index
WHERE tenant_id = 'test-tenant'
AND resource_id = 'obs-display-test'
AND param_name = 'code'",
)
.unwrap();
#[allow(clippy::type_complexity)]
let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
.query_map([], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
assert!(
!rows.is_empty(),
"Should have indexed 'code' parameter for Observation"
);
let entry = rows
.iter()
.find(|(_, _, code, _)| code.as_deref() == Some("8867-4"));
assert!(entry.is_some(), "Should have entry with code 8867-4");
let (_, _, _, display) = entry.unwrap();
assert_eq!(
display.as_deref(),
Some("Heart rate"),
"Display text should be 'Heart rate'"
);
}
#[tokio::test]
async fn test_search_index_identifier_type_populated() {
let backend = create_test_backend();
let tenant = create_test_tenant();
backend
.create(
&tenant,
"Patient",
json!({
"resourceType": "Patient",
"id": "patient-type-test",
"identifier": [
{
"type": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
"code": "MR"
}
]
},
"system": "http://hospital.org/mrn",
"value": "MRN12345"
}
]
}),
FhirVersion::default(),
)
.await
.unwrap();
let conn = backend.get_connection().unwrap();
let mut stmt = conn
.prepare(
"SELECT param_name, value_token_code, value_identifier_type_system, value_identifier_type_code
FROM search_index
WHERE tenant_id = 'test-tenant'
AND resource_id = 'patient-type-test'
AND param_name = 'identifier'",
)
.unwrap();
#[allow(clippy::type_complexity)]
let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
.query_map([], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
assert!(
!rows.is_empty(),
"Should have indexed 'identifier' parameter for Patient"
);
let entry = rows
.iter()
.find(|(_, code, _, _)| code.as_deref() == Some("MRN12345"));
assert!(entry.is_some(), "Should have entry with value MRN12345");
let (_, _, type_system, type_code) = entry.unwrap();
assert_eq!(
type_system.as_deref(),
Some("http://terminology.hl7.org/CodeSystem/v2-0203"),
"Identifier type system should be populated"
);
assert_eq!(
type_code.as_deref(),
Some("MR"),
"Identifier type code should be populated"
);
}
}