use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use deadpool_postgres::{Config, Pool, Runtime, SslMode};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio_postgres::NoTls;
use helios_fhir::FhirVersion;
use crate::core::{Backend, BackendCapability, BackendKind};
use crate::error::{BackendError, StorageResult};
use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
pub struct PostgresBackend {
pool: Pool,
config: PostgresConfig,
search_registry: Arc<RwLock<SearchParameterRegistry>>,
search_extractor: Arc<SearchParameterExtractor>,
}
impl Debug for PostgresBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresBackend")
.field("config", &self.config)
.field("search_registry_len", &self.search_registry.read().len())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresConfig {
#[serde(default = "default_host")]
pub host: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default = "default_dbname")]
pub dbname: String,
#[serde(default = "default_user")]
pub user: String,
#[serde(default)]
pub password: Option<String>,
#[serde(default)]
pub ssl_mode: PostgresSslMode,
#[serde(default = "default_max_connections")]
pub max_connections: usize,
#[serde(default = "default_connect_timeout_secs")]
pub connect_timeout_secs: u64,
#[serde(default = "default_statement_timeout_ms")]
pub statement_timeout_ms: u64,
#[serde(default)]
pub fhir_version: FhirVersion,
#[serde(default)]
pub data_dir: Option<PathBuf>,
#[serde(default)]
pub search_offloaded: bool,
#[serde(default)]
pub schema_name: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum PostgresSslMode {
Disable,
#[default]
Prefer,
Require,
}
fn default_host() -> String {
"localhost".to_string()
}
fn default_port() -> u16 {
5432
}
fn default_dbname() -> String {
"helios".to_string()
}
fn default_user() -> String {
"helios".to_string()
}
fn default_max_connections() -> usize {
10
}
fn default_connect_timeout_secs() -> u64 {
5
}
fn default_statement_timeout_ms() -> u64 {
30000
}
impl Default for PostgresConfig {
fn default() -> Self {
Self {
host: default_host(),
port: default_port(),
dbname: default_dbname(),
user: default_user(),
password: None,
ssl_mode: PostgresSslMode::default(),
max_connections: default_max_connections(),
connect_timeout_secs: default_connect_timeout_secs(),
statement_timeout_ms: default_statement_timeout_ms(),
fhir_version: FhirVersion::default(),
data_dir: None,
search_offloaded: false,
schema_name: None,
}
}
}
impl PostgresBackend {
pub async fn new(config: PostgresConfig) -> StorageResult<Self> {
let pool = Self::create_pool(&config)?;
let client = pool.get().await.map_err(|e| {
crate::error::StorageError::Backend(BackendError::ConnectionFailed {
backend_name: "postgres".to_string(),
message: e.to_string(),
})
})?;
client
.execute(
&format!("SET statement_timeout = {}", config.statement_timeout_ms),
&[],
)
.await
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Failed to set statement_timeout: {}", e),
source: None,
})
})?;
drop(client);
let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
Self::initialize_search_registry(&search_registry, &config);
let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
Ok(Self {
pool,
config,
search_registry,
search_extractor,
})
}
pub async fn from_connection_string(url: &str) -> StorageResult<Self> {
let config = Self::parse_connection_string(url)?;
Self::new(config).await
}
pub async fn from_env() -> StorageResult<Self> {
let config = PostgresConfig {
host: std::env::var("HFS_PG_HOST").unwrap_or_else(|_| default_host()),
port: std::env::var("HFS_PG_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or_else(default_port),
dbname: std::env::var("HFS_PG_DBNAME").unwrap_or_else(|_| default_dbname()),
user: std::env::var("HFS_PG_USER").unwrap_or_else(|_| default_user()),
password: std::env::var("HFS_PG_PASSWORD").ok(),
max_connections: std::env::var("HFS_PG_MAX_CONNECTIONS")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or_else(default_max_connections),
..Default::default()
};
Self::new(config).await
}
fn create_pool(config: &PostgresConfig) -> StorageResult<Pool> {
let mut cfg = Config::new();
cfg.host = Some(config.host.clone());
cfg.port = Some(config.port);
cfg.dbname = Some(config.dbname.clone());
cfg.user = Some(config.user.clone());
cfg.password = config.password.clone();
cfg.ssl_mode = Some(match config.ssl_mode {
PostgresSslMode::Disable => SslMode::Disable,
PostgresSslMode::Prefer => SslMode::Prefer,
PostgresSslMode::Require => SslMode::Require,
});
let pool = cfg
.builder(NoTls)
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Failed to create pool builder: {}", e),
source: None,
})
})?
.max_size(config.max_connections)
.runtime(Runtime::Tokio1)
.build()
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::ConnectionFailed {
backend_name: "postgres".to_string(),
message: e.to_string(),
})
})?;
Ok(pool)
}
fn parse_connection_string(url: &str) -> StorageResult<PostgresConfig> {
let url = url
.strip_prefix("postgres://")
.or_else(|| url.strip_prefix("postgresql://"))
.unwrap_or(url);
let mut config = PostgresConfig::default();
if let Some((userinfo, rest)) = url.split_once('@') {
if let Some((user, password)) = userinfo.split_once(':') {
config.user = user.to_string();
config.password = Some(password.to_string());
} else {
config.user = userinfo.to_string();
}
if let Some((hostport, dbname)) = rest.split_once('/') {
if let Some((host, port)) = hostport.split_once(':') {
config.host = host.to_string();
config.port = port.parse().unwrap_or(5432);
} else {
config.host = hostport.to_string();
}
config.dbname = dbname.to_string();
} else if let Some((host, port)) = rest.split_once(':') {
config.host = host.to_string();
config.port = port.parse().unwrap_or(5432);
} else {
config.host = rest.to_string();
}
}
Ok(config)
}
fn initialize_search_registry(
registry: &Arc<RwLock<SearchParameterRegistry>>,
config: &PostgresConfig,
) {
let loader = SearchParameterLoader::new(config.fhir_version);
let mut reg = registry.write();
let mut fallback_count = 0;
let mut spec_count = 0;
let mut spec_file: Option<PathBuf> = None;
let mut custom_count = 0;
let mut custom_files: Vec<String> = Vec::new();
match loader.load_embedded() {
Ok(params) => {
for param in params {
if reg.register(param).is_ok() {
fallback_count += 1;
}
}
}
Err(e) => {
tracing::error!("Failed to load embedded SearchParameters: {}", e);
}
}
let data_dir = config
.data_dir
.clone()
.unwrap_or_else(|| PathBuf::from("./data"));
let spec_filename = loader.spec_filename();
let spec_path = data_dir.join(spec_filename);
match loader.load_from_spec_file(&data_dir) {
Ok(params) => {
for param in params {
if reg.register(param).is_ok() {
spec_count += 1;
}
}
if spec_count > 0 {
spec_file = Some(spec_path);
}
}
Err(e) => {
tracing::warn!(
"Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
spec_path.display(),
e
);
}
}
match loader.load_custom_from_directory_with_files(&data_dir) {
Ok((params, files)) => {
for param in params {
if reg.register(param).is_ok() {
custom_count += 1;
}
}
custom_files = files;
}
Err(e) => {
tracing::warn!(
"Error loading custom SearchParameters from {}: {}",
data_dir.display(),
e
);
}
}
let resource_type_count = reg.resource_types().len();
let spec_info = spec_file
.map(|p| format!(" from {}", p.display()))
.unwrap_or_default();
let custom_info = if custom_files.is_empty() {
String::new()
} else {
format!(" [{}]", custom_files.join(", "))
};
tracing::info!(
"PostgreSQL SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
reg.len(),
spec_count,
spec_info,
fallback_count,
custom_count,
custom_info,
resource_type_count
);
}
pub async fn init_schema(&self) -> StorageResult<()> {
let client = self.get_client().await?;
super::schema::initialize_schema(&client).await?;
let stored_count = self.load_stored_search_parameters().await?;
if stored_count > 0 {
let registry = self.search_registry.read();
tracing::info!(
"Loaded {} stored SearchParameters from database (total now: {})",
stored_count,
registry.len()
);
}
Ok(())
}
async fn load_stored_search_parameters(&self) -> StorageResult<usize> {
use crate::search::registry::{SearchParameterSource, SearchParameterStatus};
let client = self.get_client().await?;
let rows = client
.query(
"SELECT data FROM resources WHERE resource_type = 'SearchParameter' AND is_deleted = FALSE",
&[],
)
.await
.map_err(|e| {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Failed to query SearchParameters: {}", e),
source: None,
})
})?;
let loader = SearchParameterLoader::new(self.config.fhir_version);
let mut registry = self.search_registry.write();
let mut count = 0;
for row in rows {
let data: serde_json::Value = row.get(0);
match loader.parse_resource(&data) {
Ok(mut def) => {
if def.status == SearchParameterStatus::Active {
def.source = SearchParameterSource::Stored;
if registry.register(def).is_ok() {
count += 1;
}
}
}
Err(e) => {
tracing::warn!("Failed to parse stored SearchParameter: {}", e);
}
}
}
Ok(count)
}
pub(crate) async fn get_client(&self) -> StorageResult<deadpool_postgres::Client> {
self.pool.get().await.map_err(|e| {
crate::error::StorageError::Backend(BackendError::ConnectionFailed {
backend_name: "postgres".to_string(),
message: e.to_string(),
})
})
}
#[allow(dead_code)]
pub(crate) fn get_search_registry(&self) -> Arc<RwLock<SearchParameterRegistry>> {
Arc::clone(&self.search_registry)
}
pub fn config(&self) -> &PostgresConfig {
&self.config
}
pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
&self.search_registry
}
pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
&self.search_extractor
}
pub fn is_search_offloaded(&self) -> bool {
self.config.search_offloaded
}
pub fn set_search_offloaded(&mut self, offloaded: bool) {
self.config.search_offloaded = offloaded;
}
}
#[allow(dead_code)]
pub struct PostgresConnection(pub(crate) deadpool_postgres::Client);
impl Debug for PostgresConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresConnection").finish()
}
}
#[async_trait]
impl Backend for PostgresBackend {
type Connection = PostgresConnection;
fn kind(&self) -> BackendKind {
BackendKind::Postgres
}
fn name(&self) -> &'static str {
"postgres"
}
fn supports(&self, capability: BackendCapability) -> bool {
matches!(
capability,
BackendCapability::Crud
| BackendCapability::Versioning
| BackendCapability::InstanceHistory
| BackendCapability::TypeHistory
| BackendCapability::SystemHistory
| BackendCapability::BasicSearch
| BackendCapability::DateSearch
| BackendCapability::ReferenceSearch
| BackendCapability::FullTextSearch
| BackendCapability::Sorting
| BackendCapability::OffsetPagination
| BackendCapability::CursorPagination
| BackendCapability::Transactions
| BackendCapability::OptimisticLocking
| BackendCapability::PessimisticLocking
| BackendCapability::Include
| BackendCapability::Revinclude
| BackendCapability::SharedSchema
| BackendCapability::SchemaPerTenant
| BackendCapability::DatabasePerTenant
)
}
fn capabilities(&self) -> Vec<BackendCapability> {
vec![
BackendCapability::Crud,
BackendCapability::Versioning,
BackendCapability::InstanceHistory,
BackendCapability::TypeHistory,
BackendCapability::SystemHistory,
BackendCapability::BasicSearch,
BackendCapability::DateSearch,
BackendCapability::ReferenceSearch,
BackendCapability::FullTextSearch,
BackendCapability::Sorting,
BackendCapability::OffsetPagination,
BackendCapability::CursorPagination,
BackendCapability::Transactions,
BackendCapability::OptimisticLocking,
BackendCapability::PessimisticLocking,
BackendCapability::Include,
BackendCapability::Revinclude,
BackendCapability::SharedSchema,
BackendCapability::SchemaPerTenant,
BackendCapability::DatabasePerTenant,
]
}
async fn acquire(&self) -> Result<Self::Connection, BackendError> {
let client = self
.pool
.get()
.await
.map_err(|e| BackendError::ConnectionFailed {
backend_name: "postgres".to_string(),
message: e.to_string(),
})?;
Ok(PostgresConnection(client))
}
async fn release(&self, _conn: Self::Connection) {
}
async fn health_check(&self) -> Result<(), BackendError> {
let client = self
.pool
.get()
.await
.map_err(|_| BackendError::Unavailable {
backend_name: "postgres".to_string(),
message: "Failed to get connection".to_string(),
})?;
client
.query_one("SELECT 1", &[])
.await
.map_err(|e| BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Health check failed: {}", e),
source: None,
})?;
Ok(())
}
async fn initialize(&self) -> Result<(), BackendError> {
self.init_schema()
.await
.map_err(|e| BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Failed to initialize schema: {}", e),
source: None,
})
}
async fn migrate(&self) -> Result<(), BackendError> {
self.init_schema()
.await
.map_err(|e| BackendError::Internal {
backend_name: "postgres".to_string(),
message: format!("Failed to run migrations: {}", e),
source: None,
})
}
}
use crate::core::capabilities::{
GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
};
use crate::types::{
IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
SearchParamType, SpecialSearchParam,
};
impl SearchCapabilityProvider for PostgresBackend {
fn resource_search_capabilities(
&self,
resource_type: &str,
) -> Option<ResourceSearchCapabilities> {
let params = {
let registry = self.search_registry.read();
registry.get_active_params(resource_type)
};
if params.is_empty() {
let common_params = {
let registry = self.search_registry.read();
registry.get_active_params("Resource")
};
if common_params.is_empty() {
return None;
}
}
let mut search_params = Vec::new();
for param in ¶ms {
let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
.with_definition(¶m.url);
let modifiers = Self::modifiers_for_type(param.param_type);
cap = cap.with_modifiers(modifiers);
if let Some(ref targets) = param.target {
cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
}
search_params.push(cap);
}
let common_params = {
let registry = self.search_registry.read();
registry.get_active_params("Resource")
};
for param in &common_params {
if !search_params.iter().any(|p| p.name == param.code) {
let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
.with_definition(¶m.url);
cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
search_params.push(cap);
}
}
Some(
ResourceSearchCapabilities::new(resource_type)
.with_special_params(vec![
SpecialSearchParam::Id,
SpecialSearchParam::LastUpdated,
SpecialSearchParam::Tag,
SpecialSearchParam::Profile,
SpecialSearchParam::Security,
])
.with_include_capabilities(vec![
IncludeCapability::Include,
IncludeCapability::Revinclude,
])
.with_pagination_capabilities(vec![
PaginationCapability::Count,
PaginationCapability::Offset,
PaginationCapability::Cursor,
PaginationCapability::MaxPageSize(1000),
PaginationCapability::DefaultPageSize(20),
])
.with_result_mode_capabilities(vec![
ResultModeCapability::Total,
ResultModeCapability::TotalNone,
ResultModeCapability::TotalAccurate,
ResultModeCapability::SummaryCount,
])
.with_param_list(search_params),
)
}
fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
GlobalSearchCapabilities::new()
.with_special_params(vec![
SpecialSearchParam::Id,
SpecialSearchParam::LastUpdated,
SpecialSearchParam::Tag,
SpecialSearchParam::Profile,
SpecialSearchParam::Security,
])
.with_pagination(vec![
PaginationCapability::Count,
PaginationCapability::Offset,
PaginationCapability::Cursor,
PaginationCapability::MaxPageSize(1000),
PaginationCapability::DefaultPageSize(20),
])
.with_system_search()
}
}
impl PostgresBackend {
fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
match param_type {
SearchParamType::String => vec!["exact", "contains", "missing"],
SearchParamType::Token => vec!["not", "text", "in", "not-in", "of-type", "missing"],
SearchParamType::Reference => vec!["identifier", "missing"],
SearchParamType::Date => vec!["missing"],
SearchParamType::Number => vec!["missing"],
SearchParamType::Quantity => vec!["missing"],
SearchParamType::Uri => vec!["below", "above", "missing"],
SearchParamType::Composite => vec!["missing"],
SearchParamType::Special => vec![],
}
}
}