use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use parking_lot::RwLock;
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use serde::{Deserialize, Serialize};
use helios_fhir::FhirVersion;
use crate::core::{Backend, BackendCapability, BackendKind};
use crate::error::{BackendError, StorageResult};
use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
use super::schema;
static MEMORY_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct SqliteBackend {
pool: Pool<SqliteConnectionManager>,
config: SqliteBackendConfig,
is_memory: bool,
search_registry: Arc<RwLock<SearchParameterRegistry>>,
search_extractor: Arc<SearchParameterExtractor>,
}
impl Debug for SqliteBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteBackend")
.field("config", &self.config)
.field("is_memory", &self.is_memory)
.field("search_registry_len", &self.search_registry.read().len())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SqliteBackendConfig {
#[serde(default = "default_max_connections")]
pub max_connections: u32,
#[serde(default = "default_min_connections")]
pub min_connections: u32,
#[serde(default = "default_connection_timeout_ms")]
pub connection_timeout_ms: u64,
#[serde(default = "default_busy_timeout_ms")]
pub busy_timeout_ms: u32,
#[serde(default = "default_true")]
pub enable_wal: bool,
#[serde(default = "default_true")]
pub enable_foreign_keys: bool,
#[serde(default)]
pub fhir_version: FhirVersion,
#[serde(default)]
pub data_dir: Option<PathBuf>,
#[serde(default)]
pub search_offloaded: bool,
}
fn default_max_connections() -> u32 {
10
}
fn default_min_connections() -> u32 {
1
}
fn default_connection_timeout_ms() -> u64 {
30000
}
fn default_busy_timeout_ms() -> u32 {
5000
}
fn default_true() -> bool {
true
}
impl Default for SqliteBackendConfig {
fn default() -> Self {
Self {
max_connections: default_max_connections(),
min_connections: default_min_connections(),
connection_timeout_ms: default_connection_timeout_ms(),
busy_timeout_ms: default_busy_timeout_ms(),
enable_wal: true,
enable_foreign_keys: true,
fhir_version: FhirVersion::default(),
data_dir: None,
search_offloaded: false,
}
}
}
impl SqliteBackend {
pub fn in_memory() -> StorageResult<Self> {
Self::with_config(":memory:", SqliteBackendConfig::default())
}
pub fn open<P: AsRef<Path>>(path: P) -> StorageResult<Self> {
Self::with_config(path, SqliteBackendConfig::default())
}
pub fn with_config<P: AsRef<Path>>(
path: P,
config: SqliteBackendConfig,
) -> StorageResult<Self> {
let path_str = path.as_ref().to_string_lossy();
let is_memory = path_str == ":memory:";
let manager = if is_memory {
let db_id = MEMORY_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
let uri = format!("file:hfs_mem_{}?mode=memory&cache=shared", db_id);
SqliteConnectionManager::file(uri)
} else {
SqliteConnectionManager::file(path.as_ref())
};
let pool = Pool::builder()
.max_size(config.max_connections)
.min_idle(Some(config.min_connections))
.connection_timeout(std::time::Duration::from_millis(
config.connection_timeout_ms,
))
.build(manager)
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::ConnectionFailed {
backend_name: "sqlite".to_string(),
message: e.to_string(),
})
})?;
let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
{
let loader = SearchParameterLoader::new(config.fhir_version);
let mut registry = search_registry.write();
let mut fallback_count = 0;
let mut spec_count = 0;
let mut spec_file: Option<PathBuf> = None;
let mut custom_count = 0;
let mut custom_files: Vec<String> = Vec::new();
match loader.load_embedded() {
Ok(params) => {
for param in params {
if registry.register(param).is_ok() {
fallback_count += 1;
}
}
}
Err(e) => {
tracing::error!("Failed to load embedded SearchParameters: {}", e);
}
}
let data_dir = config
.data_dir
.clone()
.unwrap_or_else(|| PathBuf::from("./data"));
let spec_filename = loader.spec_filename();
let spec_path = data_dir.join(spec_filename);
match loader.load_from_spec_file(&data_dir) {
Ok(params) => {
for param in params {
if registry.register(param).is_ok() {
spec_count += 1;
}
}
if spec_count > 0 {
spec_file = Some(spec_path);
}
}
Err(e) => {
tracing::warn!(
"Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
spec_path.display(),
e
);
}
}
match loader.load_custom_from_directory_with_files(&data_dir) {
Ok((params, files)) => {
for param in params {
if registry.register(param).is_ok() {
custom_count += 1;
}
}
custom_files = files;
}
Err(e) => {
tracing::warn!(
"Error loading custom SearchParameters from {}: {}",
data_dir.display(),
e
);
}
}
let resource_type_count = registry.resource_types().len();
let spec_info = spec_file
.map(|p| format!(" from {}", p.display()))
.unwrap_or_default();
let custom_info = if custom_files.is_empty() {
String::new()
} else {
format!(" [{}]", custom_files.join(", "))
};
tracing::info!(
"SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
registry.len(),
spec_count,
spec_info,
fallback_count,
custom_count,
custom_info,
resource_type_count
);
}
let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
let backend = Self {
pool,
config,
is_memory,
search_registry,
search_extractor,
};
backend.configure_connection()?;
Ok(backend)
}
pub fn init_schema(&self) -> StorageResult<()> {
let conn = self.get_connection()?;
schema::initialize_schema(&conn)?;
let stored_count = self.load_stored_search_parameters()?;
if stored_count > 0 {
let registry = self.search_registry.read();
tracing::info!(
"Loaded {} stored SearchParameters from database (total now: {})",
stored_count,
registry.len()
);
}
Ok(())
}
fn load_stored_search_parameters(&self) -> StorageResult<usize> {
use crate::search::registry::{SearchParameterSource, SearchParameterStatus};
let conn = self.get_connection()?;
let mut stmt = conn
.prepare(
"SELECT data FROM resources WHERE resource_type = 'SearchParameter' AND is_deleted = 0",
)
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to prepare SearchParameter query: {}", e),
source: None,
})
})?;
let loader = SearchParameterLoader::new(self.config.fhir_version);
let mut registry = self.search_registry.write();
let mut count = 0;
let rows = stmt
.query_map([], |row| row.get::<_, Vec<u8>>(0))
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to query SearchParameters: {}", e),
source: None,
})
})?;
for row in rows {
let data = match row {
Ok(data) => data,
Err(e) => {
tracing::warn!("Failed to read SearchParameter row: {}", e);
continue;
}
};
let json: serde_json::Value = match serde_json::from_slice(&data) {
Ok(json) => json,
Err(e) => {
tracing::warn!("Failed to parse SearchParameter JSON: {}", e);
continue;
}
};
match loader.parse_resource(&json) {
Ok(mut def) => {
if def.status == SearchParameterStatus::Active {
def.source = SearchParameterSource::Stored;
if registry.register(def).is_ok() {
count += 1;
}
}
}
Err(e) => {
tracing::warn!("Failed to parse stored SearchParameter: {}", e);
}
}
}
Ok(count)
}
pub(crate) fn get_connection(
&self,
) -> StorageResult<PooledConnection<SqliteConnectionManager>> {
self.pool.get().map_err(|e| {
crate::error::StorageError::Backend(BackendError::ConnectionFailed {
backend_name: "sqlite".to_string(),
message: e.to_string(),
})
})
}
pub(crate) fn get_search_registry(&self) -> Arc<RwLock<SearchParameterRegistry>> {
Arc::clone(&self.search_registry)
}
fn configure_connection(&self) -> StorageResult<()> {
let conn = self.get_connection()?;
conn.busy_timeout(std::time::Duration::from_millis(
self.config.busy_timeout_ms as u64,
))
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to set busy timeout: {}", e),
source: None,
})
})?;
if self.config.enable_foreign_keys {
conn.execute("PRAGMA foreign_keys = ON", []).map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to enable foreign keys: {}", e),
source: None,
})
})?;
}
if self.config.enable_wal && !self.is_memory {
conn.query_row("PRAGMA journal_mode = WAL", [], |_row| Ok(()))
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to enable WAL mode: {}", e),
source: None,
})
})?;
}
Ok(())
}
pub fn is_memory(&self) -> bool {
self.is_memory
}
pub fn config(&self) -> &SqliteBackendConfig {
&self.config
}
pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
&self.search_registry
}
pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
&self.search_extractor
}
pub fn is_search_offloaded(&self) -> bool {
self.config.search_offloaded
}
pub fn set_search_offloaded(&mut self, offloaded: bool) {
self.config.search_offloaded = offloaded;
}
}
#[allow(dead_code)]
pub struct SqliteConnection(pub(crate) PooledConnection<SqliteConnectionManager>);
impl Debug for SqliteConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteConnection").finish()
}
}
#[async_trait]
impl Backend for SqliteBackend {
type Connection = SqliteConnection;
fn kind(&self) -> BackendKind {
BackendKind::Sqlite
}
fn name(&self) -> &'static str {
"sqlite"
}
fn supports(&self, capability: BackendCapability) -> bool {
matches!(
capability,
BackendCapability::Crud
| BackendCapability::Versioning
| BackendCapability::InstanceHistory
| BackendCapability::TypeHistory
| BackendCapability::SystemHistory
| BackendCapability::BasicSearch
| BackendCapability::DateSearch
| BackendCapability::ReferenceSearch
| BackendCapability::Sorting
| BackendCapability::OffsetPagination
| BackendCapability::Transactions
| BackendCapability::OptimisticLocking
| BackendCapability::Include
| BackendCapability::Revinclude
| BackendCapability::SharedSchema
)
}
fn capabilities(&self) -> Vec<BackendCapability> {
vec![
BackendCapability::Crud,
BackendCapability::Versioning,
BackendCapability::InstanceHistory,
BackendCapability::TypeHistory,
BackendCapability::SystemHistory,
BackendCapability::BasicSearch,
BackendCapability::DateSearch,
BackendCapability::ReferenceSearch,
BackendCapability::Sorting,
BackendCapability::OffsetPagination,
BackendCapability::Transactions,
BackendCapability::OptimisticLocking,
BackendCapability::Include,
BackendCapability::Revinclude,
BackendCapability::SharedSchema,
]
}
async fn acquire(&self) -> Result<Self::Connection, BackendError> {
let conn = self
.pool
.get()
.map_err(|e| BackendError::ConnectionFailed {
backend_name: "sqlite".to_string(),
message: e.to_string(),
})?;
Ok(SqliteConnection(conn))
}
async fn release(&self, _conn: Self::Connection) {
}
async fn health_check(&self) -> Result<(), BackendError> {
let conn = self
.get_connection()
.map_err(|_| BackendError::Unavailable {
backend_name: "sqlite".to_string(),
message: "Failed to get connection".to_string(),
})?;
conn.query_row("SELECT 1", [], |_| Ok(()))
.map_err(|e| BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Health check failed: {}", e),
source: None,
})?;
Ok(())
}
async fn initialize(&self) -> Result<(), BackendError> {
self.init_schema().map_err(|e| BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to initialize schema: {}", e),
source: None,
})
}
async fn migrate(&self) -> Result<(), BackendError> {
self.init_schema().map_err(|e| BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to run migrations: {}", e),
source: None,
})
}
}
use crate::core::capabilities::{
GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
};
use crate::types::{
IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
SearchParamType, SpecialSearchParam,
};
impl SearchCapabilityProvider for SqliteBackend {
fn resource_search_capabilities(
&self,
resource_type: &str,
) -> Option<ResourceSearchCapabilities> {
let params = {
let registry = self.search_registry.read();
registry.get_active_params(resource_type)
};
if params.is_empty() {
let common_params = {
let registry = self.search_registry.read();
registry.get_active_params("Resource")
};
if common_params.is_empty() {
return None;
}
}
let mut search_params = Vec::new();
for param in ¶ms {
let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
.with_definition(¶m.url);
let modifiers = Self::modifiers_for_type(param.param_type);
cap = cap.with_modifiers(modifiers);
if let Some(ref targets) = param.target {
cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
}
search_params.push(cap);
}
let common_params = {
let registry = self.search_registry.read();
registry.get_active_params("Resource")
};
for param in &common_params {
if !search_params.iter().any(|p| p.name == param.code) {
let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
.with_definition(¶m.url);
cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
search_params.push(cap);
}
}
Some(
ResourceSearchCapabilities::new(resource_type)
.with_special_params(vec![
SpecialSearchParam::Id,
SpecialSearchParam::LastUpdated,
SpecialSearchParam::Tag,
SpecialSearchParam::Profile,
SpecialSearchParam::Security,
])
.with_include_capabilities(vec![
IncludeCapability::Include,
IncludeCapability::Revinclude,
])
.with_pagination_capabilities(vec![
PaginationCapability::Count,
PaginationCapability::Offset,
PaginationCapability::Cursor,
PaginationCapability::MaxPageSize(1000),
PaginationCapability::DefaultPageSize(20),
])
.with_result_mode_capabilities(vec![
ResultModeCapability::Total,
ResultModeCapability::TotalNone,
ResultModeCapability::TotalAccurate,
ResultModeCapability::SummaryCount,
])
.with_param_list(search_params),
)
}
fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
GlobalSearchCapabilities::new()
.with_special_params(vec![
SpecialSearchParam::Id,
SpecialSearchParam::LastUpdated,
SpecialSearchParam::Tag,
SpecialSearchParam::Profile,
SpecialSearchParam::Security,
])
.with_pagination(vec![
PaginationCapability::Count,
PaginationCapability::Offset,
PaginationCapability::Cursor,
PaginationCapability::MaxPageSize(1000),
PaginationCapability::DefaultPageSize(20),
])
.with_system_search()
}
}
impl SqliteBackend {
fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
match param_type {
SearchParamType::String => vec!["exact", "contains", "missing"],
SearchParamType::Token => vec!["not", "text", "in", "not-in", "of-type", "missing"],
SearchParamType::Reference => vec!["identifier", "missing"],
SearchParamType::Date => vec!["missing"],
SearchParamType::Number => vec!["missing"],
SearchParamType::Quantity => vec!["missing"],
SearchParamType::Uri => vec!["below", "above", "missing"],
SearchParamType::Composite => vec!["missing"],
SearchParamType::Special => vec![],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_in_memory_backend() {
let backend = SqliteBackend::in_memory().unwrap();
assert!(backend.is_memory());
assert_eq!(backend.name(), "sqlite");
assert_eq!(backend.kind(), BackendKind::Sqlite);
}
#[test]
fn test_backend_initialization() {
let backend = SqliteBackend::in_memory().unwrap();
backend.init_schema().unwrap();
backend.init_schema().unwrap(); }
#[test]
fn test_backend_capabilities() {
let backend = SqliteBackend::in_memory().unwrap();
assert!(backend.supports(BackendCapability::Crud));
assert!(backend.supports(BackendCapability::BasicSearch));
assert!(backend.supports(BackendCapability::Transactions));
assert!(!backend.supports(BackendCapability::FullTextSearch));
}
#[tokio::test]
async fn test_health_check() {
let backend = SqliteBackend::in_memory().unwrap();
backend.init_schema().unwrap();
assert!(backend.health_check().await.is_ok());
}
#[tokio::test]
async fn test_acquire_release() {
let backend = SqliteBackend::in_memory().unwrap();
let conn = backend.acquire().await.unwrap();
backend.release(conn).await;
}
#[test]
fn test_search_capability_provider_patient() {
let backend = SqliteBackend::in_memory().unwrap();
let caps = backend.resource_search_capabilities("Patient");
assert!(caps.is_some(), "Should have capabilities for Patient");
let caps = caps.unwrap();
assert_eq!(caps.resource_type, "Patient");
assert!(caps.supports_special(SpecialSearchParam::Id));
assert!(caps.supports_special(SpecialSearchParam::LastUpdated));
assert!(caps.supports_include(IncludeCapability::Include));
assert!(caps.supports_include(IncludeCapability::Revinclude));
assert!(
!caps.search_params.is_empty(),
"Should have search parameters"
);
}
#[test]
fn test_global_search_capabilities() {
let backend = SqliteBackend::in_memory().unwrap();
let global = backend.global_search_capabilities();
assert!(
global
.common_special_params
.contains(&SpecialSearchParam::Id)
);
assert!(
global
.common_special_params
.contains(&SpecialSearchParam::LastUpdated)
);
assert!(global.supports_system_search);
assert!(!global.common_pagination_capabilities.is_empty());
}
#[test]
fn test_modifiers_for_type() {
let string_mods = SqliteBackend::modifiers_for_type(SearchParamType::String);
assert!(string_mods.contains(&"exact"));
assert!(string_mods.contains(&"contains"));
assert!(string_mods.contains(&"missing"));
let token_mods = SqliteBackend::modifiers_for_type(SearchParamType::Token);
assert!(token_mods.contains(&"not"));
assert!(token_mods.contains(&"text"));
let ref_mods = SqliteBackend::modifiers_for_type(SearchParamType::Reference);
assert!(ref_mods.contains(&"identifier"));
let uri_mods = SqliteBackend::modifiers_for_type(SearchParamType::Uri);
assert!(uri_mods.contains(&"below"));
assert!(uri_mods.contains(&"above"));
}
}