Skip to main content

helios_persistence/backends/postgres/
backend.rs

1//! PostgreSQL backend implementation.
2
3use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use deadpool_postgres::{Config, Pool, Runtime, SslMode};
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio_postgres::NoTls;
12
13use helios_fhir::FhirVersion;
14
15use crate::core::{Backend, BackendCapability, BackendKind};
16use crate::error::{BackendError, StorageResult};
17use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
18
19/// PostgreSQL backend for FHIR resource storage.
20pub struct PostgresBackend {
21    pool: Pool,
22    config: PostgresConfig,
23    /// Search parameter registry (in-memory cache of active parameters).
24    search_registry: Arc<RwLock<SearchParameterRegistry>>,
25    /// Extractor for deriving searchable values from resources.
26    search_extractor: Arc<SearchParameterExtractor>,
27}
28
29impl Debug for PostgresBackend {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("PostgresBackend")
32            .field("config", &self.config)
33            .field("search_registry_len", &self.search_registry.read().len())
34            .finish_non_exhaustive()
35    }
36}
37
38/// Configuration for the PostgreSQL backend.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PostgresConfig {
41    /// PostgreSQL host.
42    #[serde(default = "default_host")]
43    pub host: String,
44
45    /// PostgreSQL port.
46    #[serde(default = "default_port")]
47    pub port: u16,
48
49    /// Database name.
50    #[serde(default = "default_dbname")]
51    pub dbname: String,
52
53    /// Database user.
54    #[serde(default = "default_user")]
55    pub user: String,
56
57    /// Database password.
58    #[serde(default)]
59    pub password: Option<String>,
60
61    /// SSL mode.
62    #[serde(default)]
63    pub ssl_mode: PostgresSslMode,
64
65    /// Maximum number of connections in the pool.
66    #[serde(default = "default_max_connections")]
67    pub max_connections: usize,
68
69    /// Connection timeout in seconds.
70    #[serde(default = "default_connect_timeout_secs")]
71    pub connect_timeout_secs: u64,
72
73    /// Statement timeout in milliseconds.
74    #[serde(default = "default_statement_timeout_ms")]
75    pub statement_timeout_ms: u64,
76
77    /// FHIR version for this backend instance.
78    #[serde(default)]
79    pub fhir_version: FhirVersion,
80
81    /// Directory containing FHIR SearchParameter spec files.
82    #[serde(default)]
83    pub data_dir: Option<PathBuf>,
84
85    /// When true, search indexing is offloaded to a secondary backend.
86    #[serde(default)]
87    pub search_offloaded: bool,
88
89    /// Optional schema name for schema-per-tenant isolation.
90    #[serde(default)]
91    pub schema_name: Option<String>,
92}
93
94/// SSL mode for PostgreSQL connections.
95#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
96#[serde(rename_all = "lowercase")]
97pub enum PostgresSslMode {
98    /// Disable SSL.
99    Disable,
100    /// Prefer SSL, but allow non-SSL.
101    #[default]
102    Prefer,
103    /// Require SSL.
104    Require,
105}
106
107fn default_host() -> String {
108    "localhost".to_string()
109}
110
111fn default_port() -> u16 {
112    5432
113}
114
115fn default_dbname() -> String {
116    "helios".to_string()
117}
118
119fn default_user() -> String {
120    "helios".to_string()
121}
122
123fn default_max_connections() -> usize {
124    10
125}
126
127fn default_connect_timeout_secs() -> u64 {
128    5
129}
130
131fn default_statement_timeout_ms() -> u64 {
132    30000
133}
134
135impl Default for PostgresConfig {
136    fn default() -> Self {
137        Self {
138            host: default_host(),
139            port: default_port(),
140            dbname: default_dbname(),
141            user: default_user(),
142            password: None,
143            ssl_mode: PostgresSslMode::default(),
144            max_connections: default_max_connections(),
145            connect_timeout_secs: default_connect_timeout_secs(),
146            statement_timeout_ms: default_statement_timeout_ms(),
147            fhir_version: FhirVersion::default(),
148            data_dir: None,
149            search_offloaded: false,
150            schema_name: None,
151        }
152    }
153}
154
155impl PostgresBackend {
156    /// Creates a new PostgreSQL backend with the given configuration.
157    pub async fn new(config: PostgresConfig) -> StorageResult<Self> {
158        let pool = Self::create_pool(&config)?;
159
160        // Verify connectivity
161        let client = pool.get().await.map_err(|e| {
162            crate::error::StorageError::Backend(BackendError::ConnectionFailed {
163                backend_name: "postgres".to_string(),
164                message: e.to_string(),
165            })
166        })?;
167
168        // Set statement timeout
169        client
170            .execute(
171                &format!("SET statement_timeout = {}", config.statement_timeout_ms),
172                &[],
173            )
174            .await
175            .map_err(|e| {
176                crate::error::StorageError::Backend(BackendError::Internal {
177                    backend_name: "postgres".to_string(),
178                    message: format!("Failed to set statement_timeout: {}", e),
179                    source: None,
180                })
181            })?;
182
183        drop(client);
184
185        // Initialize the search parameter registry
186        let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
187        Self::initialize_search_registry(&search_registry, &config);
188        let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
189
190        Ok(Self {
191            pool,
192            config,
193            search_registry,
194            search_extractor,
195        })
196    }
197
198    /// Creates a backend from a connection string.
199    pub async fn from_connection_string(url: &str) -> StorageResult<Self> {
200        let config = Self::parse_connection_string(url)?;
201        Self::new(config).await
202    }
203
204    /// Creates a backend from environment variables.
205    ///
206    /// Reads the following environment variables:
207    /// - `HFS_PG_HOST` (default: "localhost")
208    /// - `HFS_PG_PORT` (default: 5432)
209    /// - `HFS_PG_DBNAME` (default: "helios")
210    /// - `HFS_PG_USER` (default: "helios")
211    /// - `HFS_PG_PASSWORD`
212    /// - `HFS_PG_MAX_CONNECTIONS` (default: 10)
213    pub async fn from_env() -> StorageResult<Self> {
214        let config = PostgresConfig {
215            host: std::env::var("HFS_PG_HOST").unwrap_or_else(|_| default_host()),
216            port: std::env::var("HFS_PG_PORT")
217                .ok()
218                .and_then(|p| p.parse().ok())
219                .unwrap_or_else(default_port),
220            dbname: std::env::var("HFS_PG_DBNAME").unwrap_or_else(|_| default_dbname()),
221            user: std::env::var("HFS_PG_USER").unwrap_or_else(|_| default_user()),
222            password: std::env::var("HFS_PG_PASSWORD").ok(),
223            max_connections: std::env::var("HFS_PG_MAX_CONNECTIONS")
224                .ok()
225                .and_then(|p| p.parse().ok())
226                .unwrap_or_else(default_max_connections),
227            ..Default::default()
228        };
229        Self::new(config).await
230    }
231
232    fn create_pool(config: &PostgresConfig) -> StorageResult<Pool> {
233        let mut cfg = Config::new();
234        cfg.host = Some(config.host.clone());
235        cfg.port = Some(config.port);
236        cfg.dbname = Some(config.dbname.clone());
237        cfg.user = Some(config.user.clone());
238        cfg.password = config.password.clone();
239        cfg.ssl_mode = Some(match config.ssl_mode {
240            PostgresSslMode::Disable => SslMode::Disable,
241            PostgresSslMode::Prefer => SslMode::Prefer,
242            PostgresSslMode::Require => SslMode::Require,
243        });
244
245        let pool = cfg
246            .builder(NoTls)
247            .map_err(|e| {
248                crate::error::StorageError::Backend(BackendError::Internal {
249                    backend_name: "postgres".to_string(),
250                    message: format!("Failed to create pool builder: {}", e),
251                    source: None,
252                })
253            })?
254            .max_size(config.max_connections)
255            .runtime(Runtime::Tokio1)
256            .build()
257            .map_err(|e| {
258                crate::error::StorageError::Backend(BackendError::ConnectionFailed {
259                    backend_name: "postgres".to_string(),
260                    message: e.to_string(),
261                })
262            })?;
263
264        Ok(pool)
265    }
266
267    fn parse_connection_string(url: &str) -> StorageResult<PostgresConfig> {
268        // Parse postgres:// URL format
269        // postgres://user:password@host:port/dbname
270        let url = url
271            .strip_prefix("postgres://")
272            .or_else(|| url.strip_prefix("postgresql://"))
273            .unwrap_or(url);
274
275        let mut config = PostgresConfig::default();
276
277        // Split user:password@host:port/dbname
278        if let Some((userinfo, rest)) = url.split_once('@') {
279            if let Some((user, password)) = userinfo.split_once(':') {
280                config.user = user.to_string();
281                config.password = Some(password.to_string());
282            } else {
283                config.user = userinfo.to_string();
284            }
285
286            if let Some((hostport, dbname)) = rest.split_once('/') {
287                if let Some((host, port)) = hostport.split_once(':') {
288                    config.host = host.to_string();
289                    config.port = port.parse().unwrap_or(5432);
290                } else {
291                    config.host = hostport.to_string();
292                }
293                config.dbname = dbname.to_string();
294            } else if let Some((host, port)) = rest.split_once(':') {
295                config.host = host.to_string();
296                config.port = port.parse().unwrap_or(5432);
297            } else {
298                config.host = rest.to_string();
299            }
300        }
301
302        Ok(config)
303    }
304
305    fn initialize_search_registry(
306        registry: &Arc<RwLock<SearchParameterRegistry>>,
307        config: &PostgresConfig,
308    ) {
309        let loader = SearchParameterLoader::new(config.fhir_version);
310        let mut reg = registry.write();
311
312        let mut fallback_count = 0;
313        let mut spec_count = 0;
314        let mut spec_file: Option<PathBuf> = None;
315        let mut custom_count = 0;
316        let mut custom_files: Vec<String> = Vec::new();
317
318        // 1. Load minimal embedded fallback params
319        match loader.load_embedded() {
320            Ok(params) => {
321                for param in params {
322                    if reg.register(param).is_ok() {
323                        fallback_count += 1;
324                    }
325                }
326            }
327            Err(e) => {
328                tracing::error!("Failed to load embedded SearchParameters: {}", e);
329            }
330        }
331
332        // 2. Load spec file params
333        let data_dir = config
334            .data_dir
335            .clone()
336            .unwrap_or_else(|| PathBuf::from("./data"));
337        let spec_filename = loader.spec_filename();
338        let spec_path = data_dir.join(spec_filename);
339        match loader.load_from_spec_file(&data_dir) {
340            Ok(params) => {
341                for param in params {
342                    if reg.register(param).is_ok() {
343                        spec_count += 1;
344                    }
345                }
346                if spec_count > 0 {
347                    spec_file = Some(spec_path);
348                }
349            }
350            Err(e) => {
351                tracing::warn!(
352                    "Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
353                    spec_path.display(),
354                    e
355                );
356            }
357        }
358
359        // 3. Load custom SearchParameters
360        match loader.load_custom_from_directory_with_files(&data_dir) {
361            Ok((params, files)) => {
362                for param in params {
363                    if reg.register(param).is_ok() {
364                        custom_count += 1;
365                    }
366                }
367                custom_files = files;
368            }
369            Err(e) => {
370                tracing::warn!(
371                    "Error loading custom SearchParameters from {}: {}",
372                    data_dir.display(),
373                    e
374                );
375            }
376        }
377
378        let resource_type_count = reg.resource_types().len();
379        let spec_info = spec_file
380            .map(|p| format!(" from {}", p.display()))
381            .unwrap_or_default();
382        let custom_info = if custom_files.is_empty() {
383            String::new()
384        } else {
385            format!(" [{}]", custom_files.join(", "))
386        };
387        tracing::info!(
388            "PostgreSQL SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
389            reg.len(),
390            spec_count,
391            spec_info,
392            fallback_count,
393            custom_count,
394            custom_info,
395            resource_type_count
396        );
397    }
398
399    /// Initialize the database schema.
400    pub async fn init_schema(&self) -> StorageResult<()> {
401        let client = self.get_client().await?;
402        super::schema::initialize_schema(&client).await?;
403
404        // Load stored SearchParameters from database
405        let stored_count = self.load_stored_search_parameters().await?;
406        if stored_count > 0 {
407            let registry = self.search_registry.read();
408            tracing::info!(
409                "Loaded {} stored SearchParameters from database (total now: {})",
410                stored_count,
411                registry.len()
412            );
413        }
414
415        Ok(())
416    }
417
418    /// Loads SearchParameter resources stored in the database into the registry.
419    async fn load_stored_search_parameters(&self) -> StorageResult<usize> {
420        use crate::search::registry::{SearchParameterSource, SearchParameterStatus};
421
422        let client = self.get_client().await?;
423        let rows = client
424            .query(
425                "SELECT data FROM resources WHERE resource_type = 'SearchParameter' AND is_deleted = FALSE",
426                &[],
427            )
428            .await
429            .map_err(|e| {
430                crate::error::StorageError::Backend(BackendError::Internal {
431                    backend_name: "postgres".to_string(),
432                    message: format!("Failed to query SearchParameters: {}", e),
433                    source: None,
434                })
435            })?;
436
437        let loader = SearchParameterLoader::new(self.config.fhir_version);
438        let mut registry = self.search_registry.write();
439        let mut count = 0;
440
441        for row in rows {
442            let data: serde_json::Value = row.get(0);
443            match loader.parse_resource(&data) {
444                Ok(mut def) => {
445                    if def.status == SearchParameterStatus::Active {
446                        def.source = SearchParameterSource::Stored;
447                        if registry.register(def).is_ok() {
448                            count += 1;
449                        }
450                    }
451                }
452                Err(e) => {
453                    tracing::warn!("Failed to parse stored SearchParameter: {}", e);
454                }
455            }
456        }
457
458        Ok(count)
459    }
460
461    /// Get a client from the pool.
462    pub(crate) async fn get_client(&self) -> StorageResult<deadpool_postgres::Client> {
463        self.pool.get().await.map_err(|e| {
464            crate::error::StorageError::Backend(BackendError::ConnectionFailed {
465                backend_name: "postgres".to_string(),
466                message: e.to_string(),
467            })
468        })
469    }
470
471    /// Get the search parameter registry.
472    #[allow(dead_code)]
473    pub(crate) fn get_search_registry(&self) -> Arc<RwLock<SearchParameterRegistry>> {
474        Arc::clone(&self.search_registry)
475    }
476
477    /// Returns the backend configuration.
478    pub fn config(&self) -> &PostgresConfig {
479        &self.config
480    }
481
482    /// Returns a reference to the search parameter registry.
483    pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
484        &self.search_registry
485    }
486
487    /// Returns a reference to the search parameter extractor.
488    pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
489        &self.search_extractor
490    }
491
492    /// Returns whether search indexing is offloaded to a secondary backend.
493    pub fn is_search_offloaded(&self) -> bool {
494        self.config.search_offloaded
495    }
496
497    /// Sets the search offloaded flag.
498    pub fn set_search_offloaded(&mut self, offloaded: bool) {
499        self.config.search_offloaded = offloaded;
500    }
501}
502
503/// Connection wrapper for PostgreSQL.
504#[allow(dead_code)]
505pub struct PostgresConnection(pub(crate) deadpool_postgres::Client);
506
507impl Debug for PostgresConnection {
508    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509        f.debug_struct("PostgresConnection").finish()
510    }
511}
512
513#[async_trait]
514impl Backend for PostgresBackend {
515    type Connection = PostgresConnection;
516
517    fn kind(&self) -> BackendKind {
518        BackendKind::Postgres
519    }
520
521    fn name(&self) -> &'static str {
522        "postgres"
523    }
524
525    fn supports(&self, capability: BackendCapability) -> bool {
526        matches!(
527            capability,
528            BackendCapability::Crud
529                | BackendCapability::Versioning
530                | BackendCapability::InstanceHistory
531                | BackendCapability::TypeHistory
532                | BackendCapability::SystemHistory
533                | BackendCapability::BasicSearch
534                | BackendCapability::DateSearch
535                | BackendCapability::ReferenceSearch
536                | BackendCapability::FullTextSearch
537                | BackendCapability::Sorting
538                | BackendCapability::OffsetPagination
539                | BackendCapability::CursorPagination
540                | BackendCapability::Transactions
541                | BackendCapability::OptimisticLocking
542                | BackendCapability::PessimisticLocking
543                | BackendCapability::Include
544                | BackendCapability::Revinclude
545                | BackendCapability::SharedSchema
546                | BackendCapability::SchemaPerTenant
547                | BackendCapability::DatabasePerTenant
548        )
549    }
550
551    fn capabilities(&self) -> Vec<BackendCapability> {
552        vec![
553            BackendCapability::Crud,
554            BackendCapability::Versioning,
555            BackendCapability::InstanceHistory,
556            BackendCapability::TypeHistory,
557            BackendCapability::SystemHistory,
558            BackendCapability::BasicSearch,
559            BackendCapability::DateSearch,
560            BackendCapability::ReferenceSearch,
561            BackendCapability::FullTextSearch,
562            BackendCapability::Sorting,
563            BackendCapability::OffsetPagination,
564            BackendCapability::CursorPagination,
565            BackendCapability::Transactions,
566            BackendCapability::OptimisticLocking,
567            BackendCapability::PessimisticLocking,
568            BackendCapability::Include,
569            BackendCapability::Revinclude,
570            BackendCapability::SharedSchema,
571            BackendCapability::SchemaPerTenant,
572            BackendCapability::DatabasePerTenant,
573        ]
574    }
575
576    async fn acquire(&self) -> Result<Self::Connection, BackendError> {
577        let client = self
578            .pool
579            .get()
580            .await
581            .map_err(|e| BackendError::ConnectionFailed {
582                backend_name: "postgres".to_string(),
583                message: e.to_string(),
584            })?;
585        Ok(PostgresConnection(client))
586    }
587
588    async fn release(&self, _conn: Self::Connection) {
589        // Connection is automatically returned to pool when dropped
590    }
591
592    async fn health_check(&self) -> Result<(), BackendError> {
593        let client = self
594            .pool
595            .get()
596            .await
597            .map_err(|_| BackendError::Unavailable {
598                backend_name: "postgres".to_string(),
599                message: "Failed to get connection".to_string(),
600            })?;
601        client
602            .query_one("SELECT 1", &[])
603            .await
604            .map_err(|e| BackendError::Internal {
605                backend_name: "postgres".to_string(),
606                message: format!("Health check failed: {}", e),
607                source: None,
608            })?;
609        Ok(())
610    }
611
612    async fn initialize(&self) -> Result<(), BackendError> {
613        self.init_schema()
614            .await
615            .map_err(|e| BackendError::Internal {
616                backend_name: "postgres".to_string(),
617                message: format!("Failed to initialize schema: {}", e),
618                source: None,
619            })
620    }
621
622    async fn migrate(&self) -> Result<(), BackendError> {
623        self.init_schema()
624            .await
625            .map_err(|e| BackendError::Internal {
626                backend_name: "postgres".to_string(),
627                message: format!("Failed to run migrations: {}", e),
628                source: None,
629            })
630    }
631}
632
633// ============================================================================
634// SearchCapabilityProvider Implementation
635// ============================================================================
636
637use crate::core::capabilities::{
638    GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
639};
640use crate::types::{
641    IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
642    SearchParamType, SpecialSearchParam,
643};
644
645impl SearchCapabilityProvider for PostgresBackend {
646    fn resource_search_capabilities(
647        &self,
648        resource_type: &str,
649    ) -> Option<ResourceSearchCapabilities> {
650        let params = {
651            let registry = self.search_registry.read();
652            registry.get_active_params(resource_type)
653        };
654
655        if params.is_empty() {
656            let common_params = {
657                let registry = self.search_registry.read();
658                registry.get_active_params("Resource")
659            };
660            if common_params.is_empty() {
661                return None;
662            }
663        }
664
665        let mut search_params = Vec::new();
666        for param in &params {
667            let mut cap = SearchParamFullCapability::new(&param.code, param.param_type)
668                .with_definition(&param.url);
669            let modifiers = Self::modifiers_for_type(param.param_type);
670            cap = cap.with_modifiers(modifiers);
671            if let Some(ref targets) = param.target {
672                cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
673            }
674            search_params.push(cap);
675        }
676
677        let common_params = {
678            let registry = self.search_registry.read();
679            registry.get_active_params("Resource")
680        };
681        for param in &common_params {
682            if !search_params.iter().any(|p| p.name == param.code) {
683                let mut cap = SearchParamFullCapability::new(&param.code, param.param_type)
684                    .with_definition(&param.url);
685                cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
686                search_params.push(cap);
687            }
688        }
689
690        Some(
691            ResourceSearchCapabilities::new(resource_type)
692                .with_special_params(vec![
693                    SpecialSearchParam::Id,
694                    SpecialSearchParam::LastUpdated,
695                    SpecialSearchParam::Tag,
696                    SpecialSearchParam::Profile,
697                    SpecialSearchParam::Security,
698                ])
699                .with_include_capabilities(vec![
700                    IncludeCapability::Include,
701                    IncludeCapability::Revinclude,
702                ])
703                .with_pagination_capabilities(vec![
704                    PaginationCapability::Count,
705                    PaginationCapability::Offset,
706                    PaginationCapability::Cursor,
707                    PaginationCapability::MaxPageSize(1000),
708                    PaginationCapability::DefaultPageSize(20),
709                ])
710                .with_result_mode_capabilities(vec![
711                    ResultModeCapability::Total,
712                    ResultModeCapability::TotalNone,
713                    ResultModeCapability::TotalAccurate,
714                    ResultModeCapability::SummaryCount,
715                ])
716                .with_param_list(search_params),
717        )
718    }
719
720    fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
721        GlobalSearchCapabilities::new()
722            .with_special_params(vec![
723                SpecialSearchParam::Id,
724                SpecialSearchParam::LastUpdated,
725                SpecialSearchParam::Tag,
726                SpecialSearchParam::Profile,
727                SpecialSearchParam::Security,
728            ])
729            .with_pagination(vec![
730                PaginationCapability::Count,
731                PaginationCapability::Offset,
732                PaginationCapability::Cursor,
733                PaginationCapability::MaxPageSize(1000),
734                PaginationCapability::DefaultPageSize(20),
735            ])
736            .with_system_search()
737    }
738}
739
740impl PostgresBackend {
741    /// Returns supported modifiers for a parameter type.
742    fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
743        match param_type {
744            SearchParamType::String => vec!["exact", "contains", "missing"],
745            SearchParamType::Token => vec!["not", "text", "in", "not-in", "of-type", "missing"],
746            SearchParamType::Reference => vec!["identifier", "missing"],
747            SearchParamType::Date => vec!["missing"],
748            SearchParamType::Number => vec!["missing"],
749            SearchParamType::Quantity => vec!["missing"],
750            SearchParamType::Uri => vec!["below", "above", "missing"],
751            SearchParamType::Composite => vec!["missing"],
752            SearchParamType::Special => vec![],
753        }
754    }
755}