1use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use deadpool_postgres::{Config, Pool, Runtime, SslMode};
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio_postgres::NoTls;
12
13use helios_fhir::FhirVersion;
14
15use crate::core::{Backend, BackendCapability, BackendKind};
16use crate::error::{BackendError, StorageResult};
17use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
18
19pub struct PostgresBackend {
21 pool: Pool,
22 config: PostgresConfig,
23 search_registry: Arc<RwLock<SearchParameterRegistry>>,
25 search_extractor: Arc<SearchParameterExtractor>,
27}
28
29impl Debug for PostgresBackend {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("PostgresBackend")
32 .field("config", &self.config)
33 .field("search_registry_len", &self.search_registry.read().len())
34 .finish_non_exhaustive()
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PostgresConfig {
41 #[serde(default = "default_host")]
43 pub host: String,
44
45 #[serde(default = "default_port")]
47 pub port: u16,
48
49 #[serde(default = "default_dbname")]
51 pub dbname: String,
52
53 #[serde(default = "default_user")]
55 pub user: String,
56
57 #[serde(default)]
59 pub password: Option<String>,
60
61 #[serde(default)]
63 pub ssl_mode: PostgresSslMode,
64
65 #[serde(default = "default_max_connections")]
67 pub max_connections: usize,
68
69 #[serde(default = "default_connect_timeout_secs")]
71 pub connect_timeout_secs: u64,
72
73 #[serde(default = "default_statement_timeout_ms")]
75 pub statement_timeout_ms: u64,
76
77 #[serde(default = "crate::default_fhir_version")]
79 pub fhir_version: FhirVersion,
80
81 #[serde(default)]
83 pub data_dir: Option<PathBuf>,
84
85 #[serde(default)]
87 pub search_offloaded: bool,
88
89 #[serde(default)]
91 pub schema_name: Option<String>,
92}
93
94#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
96#[serde(rename_all = "lowercase")]
97pub enum PostgresSslMode {
98 Disable,
100 #[default]
102 Prefer,
103 Require,
105}
106
107fn default_host() -> String {
108 "localhost".to_string()
109}
110
111fn default_port() -> u16 {
112 5432
113}
114
115fn default_dbname() -> String {
116 "helios".to_string()
117}
118
119fn default_user() -> String {
120 "helios".to_string()
121}
122
123fn default_max_connections() -> usize {
124 10
125}
126
127fn default_connect_timeout_secs() -> u64 {
128 5
129}
130
131fn default_statement_timeout_ms() -> u64 {
132 30000
133}
134
135impl Default for PostgresConfig {
136 fn default() -> Self {
137 Self {
138 host: default_host(),
139 port: default_port(),
140 dbname: default_dbname(),
141 user: default_user(),
142 password: None,
143 ssl_mode: PostgresSslMode::default(),
144 max_connections: default_max_connections(),
145 connect_timeout_secs: default_connect_timeout_secs(),
146 statement_timeout_ms: default_statement_timeout_ms(),
147 fhir_version: FhirVersion::default_enabled(),
148 data_dir: None,
149 search_offloaded: false,
150 schema_name: None,
151 }
152 }
153}
154
155impl PostgresBackend {
156 pub async fn new(config: PostgresConfig) -> StorageResult<Self> {
158 let pool = Self::create_pool(&config)?;
159
160 let client = pool.get().await.map_err(|e| {
162 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
163 backend_name: "postgres".to_string(),
164 message: e.to_string(),
165 })
166 })?;
167
168 client
170 .execute(
171 &format!("SET statement_timeout = {}", config.statement_timeout_ms),
172 &[],
173 )
174 .await
175 .map_err(|e| {
176 crate::error::StorageError::Backend(BackendError::Internal {
177 backend_name: "postgres".to_string(),
178 message: format!("Failed to set statement_timeout: {}", e),
179 source: None,
180 })
181 })?;
182
183 drop(client);
184
185 let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
187 Self::initialize_search_registry(&search_registry, &config);
188 let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
189
190 Ok(Self {
191 pool,
192 config,
193 search_registry,
194 search_extractor,
195 })
196 }
197
198 pub async fn from_connection_string(url: &str) -> StorageResult<Self> {
200 let config = Self::parse_connection_string(url)?;
201 Self::new(config).await
202 }
203
204 pub async fn from_env() -> StorageResult<Self> {
214 let config = PostgresConfig {
215 host: std::env::var("HFS_PG_HOST").unwrap_or_else(|_| default_host()),
216 port: std::env::var("HFS_PG_PORT")
217 .ok()
218 .and_then(|p| p.parse().ok())
219 .unwrap_or_else(default_port),
220 dbname: std::env::var("HFS_PG_DBNAME").unwrap_or_else(|_| default_dbname()),
221 user: std::env::var("HFS_PG_USER").unwrap_or_else(|_| default_user()),
222 password: std::env::var("HFS_PG_PASSWORD").ok(),
223 max_connections: std::env::var("HFS_PG_MAX_CONNECTIONS")
224 .ok()
225 .and_then(|p| p.parse().ok())
226 .unwrap_or_else(default_max_connections),
227 ..Default::default()
228 };
229 Self::new(config).await
230 }
231
232 fn create_pool(config: &PostgresConfig) -> StorageResult<Pool> {
233 let mut cfg = Config::new();
234 cfg.host = Some(config.host.clone());
235 cfg.port = Some(config.port);
236 cfg.dbname = Some(config.dbname.clone());
237 cfg.user = Some(config.user.clone());
238 cfg.password = config.password.clone();
239 cfg.ssl_mode = Some(match config.ssl_mode {
240 PostgresSslMode::Disable => SslMode::Disable,
241 PostgresSslMode::Prefer => SslMode::Prefer,
242 PostgresSslMode::Require => SslMode::Require,
243 });
244
245 let pool = cfg
246 .builder(NoTls)
247 .map_err(|e| {
248 crate::error::StorageError::Backend(BackendError::Internal {
249 backend_name: "postgres".to_string(),
250 message: format!("Failed to create pool builder: {}", e),
251 source: None,
252 })
253 })?
254 .max_size(config.max_connections)
255 .runtime(Runtime::Tokio1)
256 .build()
257 .map_err(|e| {
258 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
259 backend_name: "postgres".to_string(),
260 message: e.to_string(),
261 })
262 })?;
263
264 Ok(pool)
265 }
266
267 fn parse_connection_string(url: &str) -> StorageResult<PostgresConfig> {
268 let url = url
271 .strip_prefix("postgres://")
272 .or_else(|| url.strip_prefix("postgresql://"))
273 .unwrap_or(url);
274
275 let mut config = PostgresConfig::default();
276
277 if let Some((userinfo, rest)) = url.split_once('@') {
279 if let Some((user, password)) = userinfo.split_once(':') {
280 config.user = user.to_string();
281 config.password = Some(password.to_string());
282 } else {
283 config.user = userinfo.to_string();
284 }
285
286 if let Some((hostport, dbname)) = rest.split_once('/') {
287 if let Some((host, port)) = hostport.split_once(':') {
288 config.host = host.to_string();
289 config.port = port.parse().unwrap_or(5432);
290 } else {
291 config.host = hostport.to_string();
292 }
293 config.dbname = dbname.to_string();
294 } else if let Some((host, port)) = rest.split_once(':') {
295 config.host = host.to_string();
296 config.port = port.parse().unwrap_or(5432);
297 } else {
298 config.host = rest.to_string();
299 }
300 }
301
302 Ok(config)
303 }
304
305 fn initialize_search_registry(
306 registry: &Arc<RwLock<SearchParameterRegistry>>,
307 config: &PostgresConfig,
308 ) {
309 let loader = SearchParameterLoader::new(config.fhir_version);
310 let mut reg = registry.write();
311
312 let mut fallback_count = 0;
313 let mut spec_count = 0;
314 let mut spec_file: Option<PathBuf> = None;
315 let mut custom_count = 0;
316 let mut custom_files: Vec<String> = Vec::new();
317
318 match loader.load_embedded() {
320 Ok(params) => {
321 for param in params {
322 if reg.register(param).is_ok() {
323 fallback_count += 1;
324 }
325 }
326 }
327 Err(e) => {
328 tracing::error!("Failed to load embedded SearchParameters: {}", e);
329 }
330 }
331
332 let data_dir = config
334 .data_dir
335 .clone()
336 .unwrap_or_else(|| PathBuf::from("./data"));
337 let spec_filename = loader.spec_filename();
338 let spec_path = data_dir.join(spec_filename);
339 match loader.load_from_spec_file(&data_dir) {
340 Ok(params) => {
341 for param in params {
342 if reg.register(param).is_ok() {
343 spec_count += 1;
344 }
345 }
346 if spec_count > 0 {
347 spec_file = Some(spec_path);
348 }
349 }
350 Err(e) => {
351 tracing::warn!(
352 "Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
353 spec_path.display(),
354 e
355 );
356 }
357 }
358
359 match loader.load_custom_from_directory_with_files(&data_dir) {
361 Ok((params, files)) => {
362 for param in params {
363 if reg.register(param).is_ok() {
364 custom_count += 1;
365 }
366 }
367 custom_files = files;
368 }
369 Err(e) => {
370 tracing::warn!(
371 "Error loading custom SearchParameters from {}: {}",
372 data_dir.display(),
373 e
374 );
375 }
376 }
377
378 let resource_type_count = reg.resource_types().len();
379 let spec_info = spec_file
380 .map(|p| format!(" from {}", p.display()))
381 .unwrap_or_default();
382 let custom_info = if custom_files.is_empty() {
383 String::new()
384 } else {
385 format!(" [{}]", custom_files.join(", "))
386 };
387 tracing::info!(
388 "PostgreSQL SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
389 reg.len(),
390 spec_count,
391 spec_info,
392 fallback_count,
393 custom_count,
394 custom_info,
395 resource_type_count
396 );
397 }
398
399 pub async fn init_schema(&self) -> StorageResult<()> {
401 let client = self.get_client().await?;
402 super::schema::initialize_schema(&client).await?;
403
404 let stored_count = self.load_stored_search_parameters().await?;
406 if stored_count > 0 {
407 let registry = self.search_registry.read();
408 tracing::info!(
409 "Loaded {} stored SearchParameters from database (total now: {})",
410 stored_count,
411 registry.len()
412 );
413 }
414
415 Ok(())
416 }
417
418 async fn load_stored_search_parameters(&self) -> StorageResult<usize> {
420 use crate::search::registry::{SearchParameterSource, SearchParameterStatus};
421
422 let client = self.get_client().await?;
423 let rows = client
424 .query(
425 "SELECT data FROM resources WHERE resource_type = 'SearchParameter' AND is_deleted = FALSE",
426 &[],
427 )
428 .await
429 .map_err(|e| {
430 crate::error::StorageError::Backend(BackendError::Internal {
431 backend_name: "postgres".to_string(),
432 message: format!("Failed to query SearchParameters: {}", e),
433 source: None,
434 })
435 })?;
436
437 let loader = SearchParameterLoader::new(self.config.fhir_version);
438 let mut registry = self.search_registry.write();
439 let mut count = 0;
440
441 for row in rows {
442 let data: serde_json::Value = row.get(0);
443 match loader.parse_resource(&data) {
444 Ok(mut def) => {
445 if def.status == SearchParameterStatus::Active {
446 def.source = SearchParameterSource::Stored;
447 if registry.register(def).is_ok() {
448 count += 1;
449 }
450 }
451 }
452 Err(e) => {
453 tracing::warn!("Failed to parse stored SearchParameter: {}", e);
454 }
455 }
456 }
457
458 Ok(count)
459 }
460
461 pub(crate) async fn get_client(&self) -> StorageResult<deadpool_postgres::Client> {
463 self.pool.get().await.map_err(|e| {
464 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
465 backend_name: "postgres".to_string(),
466 message: e.to_string(),
467 })
468 })
469 }
470
471 #[allow(dead_code)]
473 pub(crate) fn get_search_registry(&self) -> Arc<RwLock<SearchParameterRegistry>> {
474 Arc::clone(&self.search_registry)
475 }
476
477 pub fn config(&self) -> &PostgresConfig {
479 &self.config
480 }
481
482 pub(crate) fn pool(&self) -> Pool {
486 self.pool.clone()
487 }
488
489 pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
491 &self.search_registry
492 }
493
494 pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
496 &self.search_extractor
497 }
498
499 pub fn is_search_offloaded(&self) -> bool {
501 self.config.search_offloaded
502 }
503
504 pub fn set_search_offloaded(&mut self, offloaded: bool) {
506 self.config.search_offloaded = offloaded;
507 }
508}
509
510#[allow(dead_code)]
512pub struct PostgresConnection(pub(crate) deadpool_postgres::Client);
513
514impl Debug for PostgresConnection {
515 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516 f.debug_struct("PostgresConnection").finish()
517 }
518}
519
520#[async_trait]
521impl Backend for PostgresBackend {
522 type Connection = PostgresConnection;
523
524 fn kind(&self) -> BackendKind {
525 BackendKind::Postgres
526 }
527
528 fn name(&self) -> &'static str {
529 "postgres"
530 }
531
532 fn supports(&self, capability: BackendCapability) -> bool {
533 matches!(
534 capability,
535 BackendCapability::Crud
536 | BackendCapability::Versioning
537 | BackendCapability::InstanceHistory
538 | BackendCapability::TypeHistory
539 | BackendCapability::SystemHistory
540 | BackendCapability::BasicSearch
541 | BackendCapability::DateSearch
542 | BackendCapability::ReferenceSearch
543 | BackendCapability::FullTextSearch
544 | BackendCapability::Sorting
545 | BackendCapability::OffsetPagination
546 | BackendCapability::CursorPagination
547 | BackendCapability::Transactions
548 | BackendCapability::OptimisticLocking
549 | BackendCapability::PessimisticLocking
550 | BackendCapability::BulkExport
551 | BackendCapability::BulkSubmitIngest
552 | BackendCapability::BulkSubmitRestWorker
553 | BackendCapability::Include
554 | BackendCapability::Revinclude
555 | BackendCapability::SharedSchema
556 | BackendCapability::SchemaPerTenant
557 | BackendCapability::DatabasePerTenant
558 )
559 }
560
561 fn capabilities(&self) -> Vec<BackendCapability> {
562 vec![
563 BackendCapability::Crud,
564 BackendCapability::Versioning,
565 BackendCapability::InstanceHistory,
566 BackendCapability::TypeHistory,
567 BackendCapability::SystemHistory,
568 BackendCapability::BasicSearch,
569 BackendCapability::DateSearch,
570 BackendCapability::ReferenceSearch,
571 BackendCapability::FullTextSearch,
572 BackendCapability::Sorting,
573 BackendCapability::OffsetPagination,
574 BackendCapability::CursorPagination,
575 BackendCapability::Transactions,
576 BackendCapability::OptimisticLocking,
577 BackendCapability::PessimisticLocking,
578 BackendCapability::BulkExport,
579 BackendCapability::BulkSubmitIngest,
580 BackendCapability::BulkSubmitRestWorker,
581 BackendCapability::Include,
582 BackendCapability::Revinclude,
583 BackendCapability::SharedSchema,
584 BackendCapability::SchemaPerTenant,
585 BackendCapability::DatabasePerTenant,
586 ]
587 }
588
589 async fn acquire(&self) -> Result<Self::Connection, BackendError> {
590 let client = self
591 .pool
592 .get()
593 .await
594 .map_err(|e| BackendError::ConnectionFailed {
595 backend_name: "postgres".to_string(),
596 message: e.to_string(),
597 })?;
598 Ok(PostgresConnection(client))
599 }
600
601 async fn release(&self, _conn: Self::Connection) {
602 }
604
605 async fn health_check(&self) -> Result<(), BackendError> {
606 let client = self
607 .pool
608 .get()
609 .await
610 .map_err(|_| BackendError::Unavailable {
611 backend_name: "postgres".to_string(),
612 message: "Failed to get connection".to_string(),
613 })?;
614 client
615 .query_one("SELECT 1", &[])
616 .await
617 .map_err(|e| BackendError::Internal {
618 backend_name: "postgres".to_string(),
619 message: format!("Health check failed: {}", e),
620 source: None,
621 })?;
622 Ok(())
623 }
624
625 async fn initialize(&self) -> Result<(), BackendError> {
626 self.init_schema()
627 .await
628 .map_err(|e| BackendError::Internal {
629 backend_name: "postgres".to_string(),
630 message: format!("Failed to initialize schema: {}", e),
631 source: None,
632 })
633 }
634
635 async fn migrate(&self) -> Result<(), BackendError> {
636 self.init_schema()
637 .await
638 .map_err(|e| BackendError::Internal {
639 backend_name: "postgres".to_string(),
640 message: format!("Failed to run migrations: {}", e),
641 source: None,
642 })
643 }
644}
645
646use crate::core::capabilities::{
651 GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
652};
653use crate::types::{
654 IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
655 SearchParamType, SpecialSearchParam,
656};
657
658impl SearchCapabilityProvider for PostgresBackend {
659 fn resource_search_capabilities(
660 &self,
661 resource_type: &str,
662 ) -> Option<ResourceSearchCapabilities> {
663 let params = {
664 let registry = self.search_registry.read();
665 registry.get_active_params(resource_type)
666 };
667
668 if params.is_empty() {
669 let common_params = {
670 let registry = self.search_registry.read();
671 registry.get_active_params("Resource")
672 };
673 if common_params.is_empty() {
674 return None;
675 }
676 }
677
678 let mut search_params = Vec::new();
679 for param in ¶ms {
680 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
681 .with_definition(¶m.url);
682 let modifiers = Self::modifiers_for_type(param.param_type);
683 cap = cap.with_modifiers(modifiers);
684 if let Some(ref targets) = param.target {
685 cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
686 }
687 search_params.push(cap);
688 }
689
690 let common_params = {
691 let registry = self.search_registry.read();
692 registry.get_active_params("Resource")
693 };
694 for param in &common_params {
695 if !search_params.iter().any(|p| p.name == param.code) {
696 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
697 .with_definition(¶m.url);
698 cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
699 search_params.push(cap);
700 }
701 }
702
703 Some(
704 ResourceSearchCapabilities::new(resource_type)
705 .with_special_params(vec![
706 SpecialSearchParam::Id,
707 SpecialSearchParam::LastUpdated,
708 SpecialSearchParam::Tag,
709 SpecialSearchParam::Profile,
710 SpecialSearchParam::Security,
711 ])
712 .with_include_capabilities(vec![
713 IncludeCapability::Include,
714 IncludeCapability::Revinclude,
715 ])
716 .with_pagination_capabilities(vec![
717 PaginationCapability::Count,
718 PaginationCapability::Offset,
719 PaginationCapability::Cursor,
720 PaginationCapability::MaxPageSize(1000),
721 PaginationCapability::DefaultPageSize(20),
722 ])
723 .with_result_mode_capabilities(vec![
724 ResultModeCapability::Total,
725 ResultModeCapability::TotalNone,
726 ResultModeCapability::TotalAccurate,
727 ResultModeCapability::SummaryCount,
728 ])
729 .with_param_list(search_params),
730 )
731 }
732
733 fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
734 GlobalSearchCapabilities::new()
735 .with_special_params(vec![
736 SpecialSearchParam::Id,
737 SpecialSearchParam::LastUpdated,
738 SpecialSearchParam::Tag,
739 SpecialSearchParam::Profile,
740 SpecialSearchParam::Security,
741 ])
742 .with_pagination(vec![
743 PaginationCapability::Count,
744 PaginationCapability::Offset,
745 PaginationCapability::Cursor,
746 PaginationCapability::MaxPageSize(1000),
747 PaginationCapability::DefaultPageSize(20),
748 ])
749 .with_system_search()
750 }
751}
752
753impl PostgresBackend {
754 pub(super) fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
756 match param_type {
757 SearchParamType::String => vec!["exact", "contains", "text", "missing"],
758 SearchParamType::Token => {
762 vec!["not", "text", "code-text", "in", "of-type", "missing"]
763 }
764 SearchParamType::Reference => vec![
765 "identifier",
766 "contains",
767 "text",
768 "code-text",
769 "below",
770 "above",
771 "missing",
772 ],
773 SearchParamType::Date => vec!["missing"],
774 SearchParamType::Number => vec!["missing"],
775 SearchParamType::Quantity => vec!["missing"],
776 SearchParamType::Uri => vec!["contains", "below", "above", "missing"],
777 SearchParamType::Composite => vec!["missing"],
778 SearchParamType::Special => vec![],
779 }
780 }
781}