1use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use deadpool_postgres::{Config, Pool, Runtime, SslMode};
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio_postgres::NoTls;
12
13use helios_fhir::FhirVersion;
14
15use crate::core::{Backend, BackendCapability, BackendKind};
16use crate::error::{BackendError, StorageResult};
17use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
18
19pub struct PostgresBackend {
21 pool: Pool,
22 config: PostgresConfig,
23 search_registry: Arc<RwLock<SearchParameterRegistry>>,
25 search_extractor: Arc<SearchParameterExtractor>,
27}
28
29impl Debug for PostgresBackend {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("PostgresBackend")
32 .field("config", &self.config)
33 .field("search_registry_len", &self.search_registry.read().len())
34 .finish_non_exhaustive()
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PostgresConfig {
41 #[serde(default = "default_host")]
43 pub host: String,
44
45 #[serde(default = "default_port")]
47 pub port: u16,
48
49 #[serde(default = "default_dbname")]
51 pub dbname: String,
52
53 #[serde(default = "default_user")]
55 pub user: String,
56
57 #[serde(default)]
59 pub password: Option<String>,
60
61 #[serde(default)]
63 pub ssl_mode: PostgresSslMode,
64
65 #[serde(default = "default_max_connections")]
67 pub max_connections: usize,
68
69 #[serde(default = "default_connect_timeout_secs")]
71 pub connect_timeout_secs: u64,
72
73 #[serde(default = "default_statement_timeout_ms")]
75 pub statement_timeout_ms: u64,
76
77 #[serde(default)]
79 pub fhir_version: FhirVersion,
80
81 #[serde(default)]
83 pub data_dir: Option<PathBuf>,
84
85 #[serde(default)]
87 pub search_offloaded: bool,
88
89 #[serde(default)]
91 pub schema_name: Option<String>,
92}
93
94#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
96#[serde(rename_all = "lowercase")]
97pub enum PostgresSslMode {
98 Disable,
100 #[default]
102 Prefer,
103 Require,
105}
106
107fn default_host() -> String {
108 "localhost".to_string()
109}
110
111fn default_port() -> u16 {
112 5432
113}
114
115fn default_dbname() -> String {
116 "helios".to_string()
117}
118
119fn default_user() -> String {
120 "helios".to_string()
121}
122
123fn default_max_connections() -> usize {
124 10
125}
126
127fn default_connect_timeout_secs() -> u64 {
128 5
129}
130
131fn default_statement_timeout_ms() -> u64 {
132 30000
133}
134
135impl Default for PostgresConfig {
136 fn default() -> Self {
137 Self {
138 host: default_host(),
139 port: default_port(),
140 dbname: default_dbname(),
141 user: default_user(),
142 password: None,
143 ssl_mode: PostgresSslMode::default(),
144 max_connections: default_max_connections(),
145 connect_timeout_secs: default_connect_timeout_secs(),
146 statement_timeout_ms: default_statement_timeout_ms(),
147 fhir_version: FhirVersion::default(),
148 data_dir: None,
149 search_offloaded: false,
150 schema_name: None,
151 }
152 }
153}
154
155impl PostgresBackend {
156 pub async fn new(config: PostgresConfig) -> StorageResult<Self> {
158 let pool = Self::create_pool(&config)?;
159
160 let client = pool.get().await.map_err(|e| {
162 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
163 backend_name: "postgres".to_string(),
164 message: e.to_string(),
165 })
166 })?;
167
168 client
170 .execute(
171 &format!("SET statement_timeout = {}", config.statement_timeout_ms),
172 &[],
173 )
174 .await
175 .map_err(|e| {
176 crate::error::StorageError::Backend(BackendError::Internal {
177 backend_name: "postgres".to_string(),
178 message: format!("Failed to set statement_timeout: {}", e),
179 source: None,
180 })
181 })?;
182
183 drop(client);
184
185 let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
187 Self::initialize_search_registry(&search_registry, &config);
188 let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
189
190 Ok(Self {
191 pool,
192 config,
193 search_registry,
194 search_extractor,
195 })
196 }
197
198 pub async fn from_connection_string(url: &str) -> StorageResult<Self> {
200 let config = Self::parse_connection_string(url)?;
201 Self::new(config).await
202 }
203
204 pub async fn from_env() -> StorageResult<Self> {
214 let config = PostgresConfig {
215 host: std::env::var("HFS_PG_HOST").unwrap_or_else(|_| default_host()),
216 port: std::env::var("HFS_PG_PORT")
217 .ok()
218 .and_then(|p| p.parse().ok())
219 .unwrap_or_else(default_port),
220 dbname: std::env::var("HFS_PG_DBNAME").unwrap_or_else(|_| default_dbname()),
221 user: std::env::var("HFS_PG_USER").unwrap_or_else(|_| default_user()),
222 password: std::env::var("HFS_PG_PASSWORD").ok(),
223 max_connections: std::env::var("HFS_PG_MAX_CONNECTIONS")
224 .ok()
225 .and_then(|p| p.parse().ok())
226 .unwrap_or_else(default_max_connections),
227 ..Default::default()
228 };
229 Self::new(config).await
230 }
231
232 fn create_pool(config: &PostgresConfig) -> StorageResult<Pool> {
233 let mut cfg = Config::new();
234 cfg.host = Some(config.host.clone());
235 cfg.port = Some(config.port);
236 cfg.dbname = Some(config.dbname.clone());
237 cfg.user = Some(config.user.clone());
238 cfg.password = config.password.clone();
239 cfg.ssl_mode = Some(match config.ssl_mode {
240 PostgresSslMode::Disable => SslMode::Disable,
241 PostgresSslMode::Prefer => SslMode::Prefer,
242 PostgresSslMode::Require => SslMode::Require,
243 });
244
245 let pool = cfg
246 .builder(NoTls)
247 .map_err(|e| {
248 crate::error::StorageError::Backend(BackendError::Internal {
249 backend_name: "postgres".to_string(),
250 message: format!("Failed to create pool builder: {}", e),
251 source: None,
252 })
253 })?
254 .max_size(config.max_connections)
255 .runtime(Runtime::Tokio1)
256 .build()
257 .map_err(|e| {
258 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
259 backend_name: "postgres".to_string(),
260 message: e.to_string(),
261 })
262 })?;
263
264 Ok(pool)
265 }
266
267 fn parse_connection_string(url: &str) -> StorageResult<PostgresConfig> {
268 let url = url
271 .strip_prefix("postgres://")
272 .or_else(|| url.strip_prefix("postgresql://"))
273 .unwrap_or(url);
274
275 let mut config = PostgresConfig::default();
276
277 if let Some((userinfo, rest)) = url.split_once('@') {
279 if let Some((user, password)) = userinfo.split_once(':') {
280 config.user = user.to_string();
281 config.password = Some(password.to_string());
282 } else {
283 config.user = userinfo.to_string();
284 }
285
286 if let Some((hostport, dbname)) = rest.split_once('/') {
287 if let Some((host, port)) = hostport.split_once(':') {
288 config.host = host.to_string();
289 config.port = port.parse().unwrap_or(5432);
290 } else {
291 config.host = hostport.to_string();
292 }
293 config.dbname = dbname.to_string();
294 } else if let Some((host, port)) = rest.split_once(':') {
295 config.host = host.to_string();
296 config.port = port.parse().unwrap_or(5432);
297 } else {
298 config.host = rest.to_string();
299 }
300 }
301
302 Ok(config)
303 }
304
305 fn initialize_search_registry(
306 registry: &Arc<RwLock<SearchParameterRegistry>>,
307 config: &PostgresConfig,
308 ) {
309 let loader = SearchParameterLoader::new(config.fhir_version);
310 let mut reg = registry.write();
311
312 let mut fallback_count = 0;
313 let mut spec_count = 0;
314 let mut spec_file: Option<PathBuf> = None;
315 let mut custom_count = 0;
316 let mut custom_files: Vec<String> = Vec::new();
317
318 match loader.load_embedded() {
320 Ok(params) => {
321 for param in params {
322 if reg.register(param).is_ok() {
323 fallback_count += 1;
324 }
325 }
326 }
327 Err(e) => {
328 tracing::error!("Failed to load embedded SearchParameters: {}", e);
329 }
330 }
331
332 let data_dir = config
334 .data_dir
335 .clone()
336 .unwrap_or_else(|| PathBuf::from("./data"));
337 let spec_filename = loader.spec_filename();
338 let spec_path = data_dir.join(spec_filename);
339 match loader.load_from_spec_file(&data_dir) {
340 Ok(params) => {
341 for param in params {
342 if reg.register(param).is_ok() {
343 spec_count += 1;
344 }
345 }
346 if spec_count > 0 {
347 spec_file = Some(spec_path);
348 }
349 }
350 Err(e) => {
351 tracing::warn!(
352 "Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
353 spec_path.display(),
354 e
355 );
356 }
357 }
358
359 match loader.load_custom_from_directory_with_files(&data_dir) {
361 Ok((params, files)) => {
362 for param in params {
363 if reg.register(param).is_ok() {
364 custom_count += 1;
365 }
366 }
367 custom_files = files;
368 }
369 Err(e) => {
370 tracing::warn!(
371 "Error loading custom SearchParameters from {}: {}",
372 data_dir.display(),
373 e
374 );
375 }
376 }
377
378 let resource_type_count = reg.resource_types().len();
379 let spec_info = spec_file
380 .map(|p| format!(" from {}", p.display()))
381 .unwrap_or_default();
382 let custom_info = if custom_files.is_empty() {
383 String::new()
384 } else {
385 format!(" [{}]", custom_files.join(", "))
386 };
387 tracing::info!(
388 "PostgreSQL SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
389 reg.len(),
390 spec_count,
391 spec_info,
392 fallback_count,
393 custom_count,
394 custom_info,
395 resource_type_count
396 );
397 }
398
399 pub async fn init_schema(&self) -> StorageResult<()> {
401 let client = self.get_client().await?;
402 super::schema::initialize_schema(&client).await?;
403
404 let stored_count = self.load_stored_search_parameters().await?;
406 if stored_count > 0 {
407 let registry = self.search_registry.read();
408 tracing::info!(
409 "Loaded {} stored SearchParameters from database (total now: {})",
410 stored_count,
411 registry.len()
412 );
413 }
414
415 Ok(())
416 }
417
418 async fn load_stored_search_parameters(&self) -> StorageResult<usize> {
420 use crate::search::registry::{SearchParameterSource, SearchParameterStatus};
421
422 let client = self.get_client().await?;
423 let rows = client
424 .query(
425 "SELECT data FROM resources WHERE resource_type = 'SearchParameter' AND is_deleted = FALSE",
426 &[],
427 )
428 .await
429 .map_err(|e| {
430 crate::error::StorageError::Backend(BackendError::Internal {
431 backend_name: "postgres".to_string(),
432 message: format!("Failed to query SearchParameters: {}", e),
433 source: None,
434 })
435 })?;
436
437 let loader = SearchParameterLoader::new(self.config.fhir_version);
438 let mut registry = self.search_registry.write();
439 let mut count = 0;
440
441 for row in rows {
442 let data: serde_json::Value = row.get(0);
443 match loader.parse_resource(&data) {
444 Ok(mut def) => {
445 if def.status == SearchParameterStatus::Active {
446 def.source = SearchParameterSource::Stored;
447 if registry.register(def).is_ok() {
448 count += 1;
449 }
450 }
451 }
452 Err(e) => {
453 tracing::warn!("Failed to parse stored SearchParameter: {}", e);
454 }
455 }
456 }
457
458 Ok(count)
459 }
460
461 pub(crate) async fn get_client(&self) -> StorageResult<deadpool_postgres::Client> {
463 self.pool.get().await.map_err(|e| {
464 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
465 backend_name: "postgres".to_string(),
466 message: e.to_string(),
467 })
468 })
469 }
470
471 #[allow(dead_code)]
473 pub(crate) fn get_search_registry(&self) -> Arc<RwLock<SearchParameterRegistry>> {
474 Arc::clone(&self.search_registry)
475 }
476
477 pub fn config(&self) -> &PostgresConfig {
479 &self.config
480 }
481
482 pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
484 &self.search_registry
485 }
486
487 pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
489 &self.search_extractor
490 }
491
492 pub fn is_search_offloaded(&self) -> bool {
494 self.config.search_offloaded
495 }
496
497 pub fn set_search_offloaded(&mut self, offloaded: bool) {
499 self.config.search_offloaded = offloaded;
500 }
501}
502
503#[allow(dead_code)]
505pub struct PostgresConnection(pub(crate) deadpool_postgres::Client);
506
507impl Debug for PostgresConnection {
508 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509 f.debug_struct("PostgresConnection").finish()
510 }
511}
512
513#[async_trait]
514impl Backend for PostgresBackend {
515 type Connection = PostgresConnection;
516
517 fn kind(&self) -> BackendKind {
518 BackendKind::Postgres
519 }
520
521 fn name(&self) -> &'static str {
522 "postgres"
523 }
524
525 fn supports(&self, capability: BackendCapability) -> bool {
526 matches!(
527 capability,
528 BackendCapability::Crud
529 | BackendCapability::Versioning
530 | BackendCapability::InstanceHistory
531 | BackendCapability::TypeHistory
532 | BackendCapability::SystemHistory
533 | BackendCapability::BasicSearch
534 | BackendCapability::DateSearch
535 | BackendCapability::ReferenceSearch
536 | BackendCapability::FullTextSearch
537 | BackendCapability::Sorting
538 | BackendCapability::OffsetPagination
539 | BackendCapability::CursorPagination
540 | BackendCapability::Transactions
541 | BackendCapability::OptimisticLocking
542 | BackendCapability::PessimisticLocking
543 | BackendCapability::Include
544 | BackendCapability::Revinclude
545 | BackendCapability::SharedSchema
546 | BackendCapability::SchemaPerTenant
547 | BackendCapability::DatabasePerTenant
548 )
549 }
550
551 fn capabilities(&self) -> Vec<BackendCapability> {
552 vec![
553 BackendCapability::Crud,
554 BackendCapability::Versioning,
555 BackendCapability::InstanceHistory,
556 BackendCapability::TypeHistory,
557 BackendCapability::SystemHistory,
558 BackendCapability::BasicSearch,
559 BackendCapability::DateSearch,
560 BackendCapability::ReferenceSearch,
561 BackendCapability::FullTextSearch,
562 BackendCapability::Sorting,
563 BackendCapability::OffsetPagination,
564 BackendCapability::CursorPagination,
565 BackendCapability::Transactions,
566 BackendCapability::OptimisticLocking,
567 BackendCapability::PessimisticLocking,
568 BackendCapability::Include,
569 BackendCapability::Revinclude,
570 BackendCapability::SharedSchema,
571 BackendCapability::SchemaPerTenant,
572 BackendCapability::DatabasePerTenant,
573 ]
574 }
575
576 async fn acquire(&self) -> Result<Self::Connection, BackendError> {
577 let client = self
578 .pool
579 .get()
580 .await
581 .map_err(|e| BackendError::ConnectionFailed {
582 backend_name: "postgres".to_string(),
583 message: e.to_string(),
584 })?;
585 Ok(PostgresConnection(client))
586 }
587
588 async fn release(&self, _conn: Self::Connection) {
589 }
591
592 async fn health_check(&self) -> Result<(), BackendError> {
593 let client = self
594 .pool
595 .get()
596 .await
597 .map_err(|_| BackendError::Unavailable {
598 backend_name: "postgres".to_string(),
599 message: "Failed to get connection".to_string(),
600 })?;
601 client
602 .query_one("SELECT 1", &[])
603 .await
604 .map_err(|e| BackendError::Internal {
605 backend_name: "postgres".to_string(),
606 message: format!("Health check failed: {}", e),
607 source: None,
608 })?;
609 Ok(())
610 }
611
612 async fn initialize(&self) -> Result<(), BackendError> {
613 self.init_schema()
614 .await
615 .map_err(|e| BackendError::Internal {
616 backend_name: "postgres".to_string(),
617 message: format!("Failed to initialize schema: {}", e),
618 source: None,
619 })
620 }
621
622 async fn migrate(&self) -> Result<(), BackendError> {
623 self.init_schema()
624 .await
625 .map_err(|e| BackendError::Internal {
626 backend_name: "postgres".to_string(),
627 message: format!("Failed to run migrations: {}", e),
628 source: None,
629 })
630 }
631}
632
633use crate::core::capabilities::{
638 GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
639};
640use crate::types::{
641 IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
642 SearchParamType, SpecialSearchParam,
643};
644
645impl SearchCapabilityProvider for PostgresBackend {
646 fn resource_search_capabilities(
647 &self,
648 resource_type: &str,
649 ) -> Option<ResourceSearchCapabilities> {
650 let params = {
651 let registry = self.search_registry.read();
652 registry.get_active_params(resource_type)
653 };
654
655 if params.is_empty() {
656 let common_params = {
657 let registry = self.search_registry.read();
658 registry.get_active_params("Resource")
659 };
660 if common_params.is_empty() {
661 return None;
662 }
663 }
664
665 let mut search_params = Vec::new();
666 for param in ¶ms {
667 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
668 .with_definition(¶m.url);
669 let modifiers = Self::modifiers_for_type(param.param_type);
670 cap = cap.with_modifiers(modifiers);
671 if let Some(ref targets) = param.target {
672 cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
673 }
674 search_params.push(cap);
675 }
676
677 let common_params = {
678 let registry = self.search_registry.read();
679 registry.get_active_params("Resource")
680 };
681 for param in &common_params {
682 if !search_params.iter().any(|p| p.name == param.code) {
683 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
684 .with_definition(¶m.url);
685 cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
686 search_params.push(cap);
687 }
688 }
689
690 Some(
691 ResourceSearchCapabilities::new(resource_type)
692 .with_special_params(vec![
693 SpecialSearchParam::Id,
694 SpecialSearchParam::LastUpdated,
695 SpecialSearchParam::Tag,
696 SpecialSearchParam::Profile,
697 SpecialSearchParam::Security,
698 ])
699 .with_include_capabilities(vec![
700 IncludeCapability::Include,
701 IncludeCapability::Revinclude,
702 ])
703 .with_pagination_capabilities(vec![
704 PaginationCapability::Count,
705 PaginationCapability::Offset,
706 PaginationCapability::Cursor,
707 PaginationCapability::MaxPageSize(1000),
708 PaginationCapability::DefaultPageSize(20),
709 ])
710 .with_result_mode_capabilities(vec![
711 ResultModeCapability::Total,
712 ResultModeCapability::TotalNone,
713 ResultModeCapability::TotalAccurate,
714 ResultModeCapability::SummaryCount,
715 ])
716 .with_param_list(search_params),
717 )
718 }
719
720 fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
721 GlobalSearchCapabilities::new()
722 .with_special_params(vec![
723 SpecialSearchParam::Id,
724 SpecialSearchParam::LastUpdated,
725 SpecialSearchParam::Tag,
726 SpecialSearchParam::Profile,
727 SpecialSearchParam::Security,
728 ])
729 .with_pagination(vec![
730 PaginationCapability::Count,
731 PaginationCapability::Offset,
732 PaginationCapability::Cursor,
733 PaginationCapability::MaxPageSize(1000),
734 PaginationCapability::DefaultPageSize(20),
735 ])
736 .with_system_search()
737 }
738}
739
740impl PostgresBackend {
741 fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
743 match param_type {
744 SearchParamType::String => vec!["exact", "contains", "missing"],
745 SearchParamType::Token => vec!["not", "text", "in", "not-in", "of-type", "missing"],
746 SearchParamType::Reference => vec!["identifier", "missing"],
747 SearchParamType::Date => vec!["missing"],
748 SearchParamType::Number => vec!["missing"],
749 SearchParamType::Quantity => vec!["missing"],
750 SearchParamType::Uri => vec!["below", "above", "missing"],
751 SearchParamType::Composite => vec!["missing"],
752 SearchParamType::Special => vec![],
753 }
754 }
755}